[jira] [Commented] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-24 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378814#comment-14378814
 ] 

Gwen Shapira commented on KAFKA-2044:
-

Created reviewboard https://reviews.apache.org/r/32459/diff/
 against branch trunk

 Support requests and responses from o.a.k.common in KafkaApis
 -

 Key: KAFKA-2044
 URL: https://issues.apache.org/jira/browse/KAFKA-2044
 Project: Kafka
  Issue Type: Improvement
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2044.patch


 As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
 support handling of requests and responses from o.a.k.common in KafkaApis.
 This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
 migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : KafkaPreCommit #42

2015-03-24 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/42/changes



KAFKA-2042

2015-03-24 Thread Guozhang Wang
Hello,

We found a serious bug while testing flush() calls in the new producer,
which is summarized in KAFKA-2042.

In general, when the producer starts up it will try to refresh metadata
with empty topic list, and hence get all the topic metadata. When sending
the message with some topic later, it will hence not cause the topic to be
added into metadata's topic list since the metadata is available. When the
data is still sitting in the accumulator and a new topic is created, that
will cause metadata refresh with just this single topic, hence losing the
metadata for any other topics. Under usual scenarios the messages will be
sitting in the accumulator until another send() is triggered with the same
topic, but with flush() as a blocking call the likelihood of this issue
being exposed that messages gets blocked forever inside flush() could be
largely increased.

I am writing to ask if people think this problem is severe enough that
requires another bug-fix release.

-- Guozhang


Re: [VOTE] KIP-15 add a close with timeout to new producer

2015-03-24 Thread Guozhang Wang
+1.

On Tue, Mar 24, 2015 at 2:15 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:


 https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+method+with+a+timeout+in+the+producer

 As a short summary, the new interface will be as following:
 Close(Long Timeout, TimeUnit timeUnit)

   1.  When timeout  0, it will try to wait up to timeout for the sender
 thread to complete all the requests, then join the sender thread. If the
 sender thread is not able to finish work before timeout, the method force
 close the producer by fail all the incomplete requests and join the sender
 thread.
   2.  When timeout = 0, it will be a non-blocking call, just initiate the
 force close and DOES NOT join the sender thread.

 If close(timeout) is called from callback, an error message will be logged
 and the producer sender thread will block forever.




-- 
-- Guozhang


Re: [Discussion] Using Client Requests and Responses in Server

2015-03-24 Thread Gwen Shapira
OK, I posted a working patch on KAFKA-2044 and
https://reviews.apache.org/r/32459/diff/.

There are few decisions there than can be up to discussion (factory method
on AbstractRequestResponse, the new handleErrors in request API), but as
far as support for o.a.k.common requests in core goes, it does what it
needs to do.

Please review!

Gwen



On Tue, Mar 24, 2015 at 10:59 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 Hi,

 I uploaded a (very) preliminary patch with my idea.

 One thing thats missing:
 RequestResponse had  handleError method that all requests implemented,
 typically generating appropriate error Response for the request and sending
 it along. Its used by KafkaApis to handle all protocol errors for valid
 requests that are not handled elsewhere.
 AbstractRequestResponse doesn't have such method.

 I can, of course, add it.
 But before I jump into this, I'm wondering if there was another plan on
 handling Api errors.

 Gwen

 On Mon, Mar 23, 2015 at 6:16 PM, Jun Rao j...@confluent.io wrote:

 I think what you are saying is that in RequestChannel, we can start
 generating header/body for new request types and leave requestObj null.
 For
 existing requests, header/body will be null initially. Gradually, we can
 migrate each type of requests by populating header/body, instead of
 requestObj. This makes sense to me since it serves two purposes (1) not
 polluting the code base with duplicated request/response objects for new
 types of requests and (2) allowing the refactoring of existing requests to
 be done in smaller pieces.

 Could you try that approach and perhaps just migrate one existing request
 type (e.g. HeartBeatRequest) as an example? We probably need to rewind the
 buffer after reading the requestId when deserializing the header (since
 the
 header includes the request id).

 Thanks,

 Jun

 On Mon, Mar 23, 2015 at 4:52 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  I'm thinking of a different approach, that will not fix everything, but
  will allow adding new requests without code duplication (and therefore
  unblock KIP-4):
 
  RequestChannel.request currently takes a buffer and parses it into an
 old
  request object. Since the objects are byte-compatibly, we should be
 able to
  parse existing requests into both old and new objects. New requests will
  only be parsed into new objects.
 
  Basically:
  val requestId = buffer.getShort()
  if (requestId in keyToNameAndDeserializerMap) {
 requestObj = RequestKeys.deserializerForKey(requestId)(buffer)
 header: RequestHeader = RequestHeader.parse(buffer)
 body: Struct =
 
 ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
  } else {
 requestObj = null
  header: RequestHeader = RequestHeader.parse(buffer)
 body: Struct =
 
 ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
  }
 
  This way existing KafkaApis will keep working as normal. The new Apis
 can
  implement just the new header/body requests.
  We'll do the same on the send-side: BoundedByteBufferSend can have a
  constructor that takes header/body instead of just a response object.
 
  Does that make sense?
 
  Once we have this in, we can move to:
  * Adding the missing request/response to the client code
  * Replacing requests that can be replaced
 
  It will also make life easier by having us review and tests smaller
 chunks
  of work (the existing patch is *huge* , touches nearly every core
 component
  and I'm not done yet...)
 
  Gwen
 
 
 
 
  On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
 
   Ack, yeah, forgot about that.
  
   It's not just a difference of wrappers. The server side actually sends
  the
   bytes lazily using FileChannel.transferTo. We need to make it
 possible to
   carry over that optimization. In some sense what we want to be able
 to do
   is set a value to a Send instead of a ByteBuffer.
  
   Let me try to add that support to the protocol definition stuff, will
   probably take me a few days to free up time.
  
   -Jay
  
   On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
In case anyone is still following this thread, I need a bit of help
 :)
   
The old FetchResponse.PartitionData included a MessageSet object.
The new FetchResponse.PartitionData includes a ByteBuffer.
   
However, when we read from logs, we return a MessageSet, and as far
 as
  I
can see, these can't be converted to ByteBuffers (at least not
 without
copying their data).
   
Did anyone consider how to reconcile the MessageSets with the new
FetchResponse objects?
   
Gwen
   
   
On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira 
 gshap...@cloudera.com
wrote:
   
 Note: I'm also treating ZkUtils as if it was a public API (i.e.
converting
 objects that are returned into o.a.k.common equivalents but not
   changing
 ZkUtils itself).
 I know its not public, but I suspect I'm not the only 

Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/#review77675
---

Ship it!


Ship It!

- Guozhang Wang


On March 24, 2015, 8:57 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32434/
 ---
 
 (Updated March 24, 2015, 8:57 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2042
 https://issues.apache.org/jira/browse/KAFKA-2042
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Move the change to KafkaProducer after talking to Guozhang offline.
 
 
 A less expensive fix
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
 c8bde8b732defa20819730d87303a9a80d01116f 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
 
 Diff: https://reviews.apache.org/r/32434/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2042:
-
Fix Version/s: 0.8.3

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Fix For: 0.8.3

 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
 KAFKA-2042_2015-03-24_13:57:23.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378828#comment-14378828
 ] 

Guozhang Wang commented on KAFKA-2042:
--

Thanks for the patch, committed to trunk.

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Fix For: 0.8.3

 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
 KAFKA-2042_2015-03-24_13:57:23.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2035) Add a topic config cache.

2015-03-24 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378925#comment-14378925
 ] 

Parth Brahmbhatt commented on KAFKA-2035:
-

Posted a review https://reviews.apache.org/r/32460/

 Add a topic config cache.
 -

 Key: KAFKA-2035
 URL: https://issues.apache.org/jira/browse/KAFKA-2035
 Project: Kafka
  Issue Type: Task
Reporter: Parth Brahmbhatt
Assignee: Parth Brahmbhatt

 Currently the topic config is all about Log configuration so we have a 
 TopicConfigManager which takes in a Log instance and keeps updating that 
 instance's config instance as and when the topic config is updated. The topic 
 config update notifications are sent using zk watchers by Controller.
 I propose to introduce a TopicConfigCache which will be updated by 
 TopicConfigManager on any config changes. The log instance and any other 
 component (like the authorizer mentioned in KAFKA-1688) will have a reference 
 to TopicConfigCache using which they will access the topic configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2047:

Attachment: KAFKA-2047.patch

 Accelarate consumer rebalance in mirror maker.
 --

 Key: KAFKA-2047
 URL: https://issues.apache.org/jira/browse/KAFKA-2047
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-2047.patch


 In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
 longer because there are more zookeeper consumer connectors doing rebalance 
 serially. Rebalance would be faster if the bootstrap of 
 ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378935#comment-14378935
 ] 

Jiangjie Qin commented on KAFKA-2047:
-

Created reviewboard https://reviews.apache.org/r/32465/diff/
 against branch origin/trunk

 Accelarate consumer rebalance in mirror maker.
 --

 Key: KAFKA-2047
 URL: https://issues.apache.org/jira/browse/KAFKA-2047
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-2047.patch


 In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
 longer because there are more zookeeper consumer connectors doing rebalance 
 serially. Rebalance would be faster if the bootstrap of 
 ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2047:

Status: Patch Available  (was: Open)

 Accelarate consumer rebalance in mirror maker.
 --

 Key: KAFKA-2047
 URL: https://issues.apache.org/jira/browse/KAFKA-2047
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-2047.patch


 In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
 longer because there are more zookeeper consumer connectors doing rebalance 
 serially. Rebalance would be faster if the bootstrap of 
 ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32465: Patch for KAFKA-2047

2015-03-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32465/
---

Review request for kafka.


Bugs: KAFKA-2047
https://issues.apache.org/jira/browse/KAFKA-2047


Repository: kafka


Description
---

Fix for KAFKA-2047 Accelarate consumer bootstrap consumer rebalance in mirror 
maker.


Diffs
-

  core/src/main/scala/kafka/tools/MirrorMaker.scala 
4f3c4c872e144195bb4b742b802fa3b931edb534 

Diff: https://reviews.apache.org/r/32465/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-24 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2044:

Status: Patch Available  (was: Open)

 Support requests and responses from o.a.k.common in KafkaApis
 -

 Key: KAFKA-2044
 URL: https://issues.apache.org/jira/browse/KAFKA-2044
 Project: Kafka
  Issue Type: Improvement
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2044.patch


 As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
 support handling of requests and responses from o.a.k.common in KafkaApis.
 This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
 migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32459: Patch for KAFKA-2044

2015-03-24 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/
---

Review request for kafka.


Bugs: KAFKA-2044
https://issues.apache.org/jira/browse/KAFKA-2044


Repository: kafka


Description
---

support requests and responses using Common api in core modules (missing files)


added error handling and factory method for requests


Diffs
-

  
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
 37aff6c0fd2ec2da8551aa74b166ca49b224ddd3 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
6943878116a97c02b758d273d93976019688830e 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
1ebc188742fd65e5e744003b4579324874fd81a9 
  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
9a71faae3138af1b4fb48125db619ddc3ad13c5a 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
3651e8603dd0ed0d2ea059786c68cf0722aa094b 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
55ecac285e00abf38d7131368bb46b4c4010dc87 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 

Diff: https://reviews.apache.org/r/32459/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-24 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2044:

Attachment: KAFKA-2044.patch

 Support requests and responses from o.a.k.common in KafkaApis
 -

 Key: KAFKA-2044
 URL: https://issues.apache.org/jira/browse/KAFKA-2044
 Project: Kafka
  Issue Type: Improvement
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2044.patch


 As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
 support handling of requests and responses from o.a.k.common in KafkaApis.
 This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
 migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31366: Patch for KAFKA-1461

2015-03-24 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31366/#review77674
---



core/src/main/scala/kafka/server/AbstractFetcherThread.scala
https://reviews.apache.org/r/31366/#comment125847

Jun has a comment about the case when all partitions gets inactive, which 
is common when the fetched broker has been just gone through leader migration.

We can move the foreach statement before the if statement, and after 
foreach check if any partitions gets added, if not just backoff for 
fetchBackoffMs.


- Guozhang Wang


On March 17, 2015, 11:03 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31366/
 ---
 
 (Updated March 17, 2015, 11:03 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1461
 https://issues.apache.org/jira/browse/KAFKA-1461
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1461. Replica fetcher thread does not implement any back-off behavior.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 e731df4b2a3e44aa3d761713a09b1070aff81430 
   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
 96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 
 
 Diff: https://reviews.apache.org/r/31366/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378842#comment-14378842
 ] 

Joel Koshy commented on KAFKA-2046:
---

Hey Clark - Onur and I are looking into this.

 Delete topic still doesn't work
 ---

 Key: KAFKA-2046
 URL: https://issues.apache.org/jira/browse/KAFKA-2046
 Project: Kafka
  Issue Type: Bug
Reporter: Clark Haskins
Assignee: Sriharsha Chintalapani

 I just attempted to delete at 128 partition topic with all inbound producers 
 stopped.
 The result was as follows:
 The /admin/delete_topics znode was empty
 the topic under /brokers/topics was removed
 The Kafka topics command showed that the topic was removed
 However, the data on disk on each broker was not deleted and the topic has 
 not yet been re-created by starting up the inbound mirror maker.
 Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-2046:
--
Assignee: Onur Karaman  (was: Sriharsha Chintalapani)

 Delete topic still doesn't work
 ---

 Key: KAFKA-2046
 URL: https://issues.apache.org/jira/browse/KAFKA-2046
 Project: Kafka
  Issue Type: Bug
Reporter: Clark Haskins
Assignee: Onur Karaman

 I just attempted to delete at 128 partition topic with all inbound producers 
 stopped.
 The result was as follows:
 The /admin/delete_topics znode was empty
 the topic under /brokers/topics was removed
 The Kafka topics command showed that the topic was removed
 However, the data on disk on each broker was not deleted and the topic has 
 not yet been re-created by starting up the inbound mirror maker.
 Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : Kafka-trunk #430

2015-03-24 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/430/changes



[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378741#comment-14378741
 ] 

Sriharsha Chintalapani commented on KAFKA-2046:
---

[~clarkhaskins] can you add the details on how the big the cluster was and also 
do you have state-change.log files on the brokers where the Log data was not 
deleted.

 Delete topic still doesn't work
 ---

 Key: KAFKA-2046
 URL: https://issues.apache.org/jira/browse/KAFKA-2046
 Project: Kafka
  Issue Type: Bug
Reporter: Clark Haskins

 I just attempted to delete at 128 partition topic with all inbound producers 
 stopped.
 The result was as follows:
 The /admin/delete_topics znode was empty
 the topic under /brokers/topics was removed
 The Kafka topics command showed that the topic was removed
 However, the data on disk on each broker was not deleted and the topic has 
 not yet been re-created by starting up the inbound mirror maker.
 Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned KAFKA-2046:
-

Assignee: Sriharsha Chintalapani

 Delete topic still doesn't work
 ---

 Key: KAFKA-2046
 URL: https://issues.apache.org/jira/browse/KAFKA-2046
 Project: Kafka
  Issue Type: Bug
Reporter: Clark Haskins
Assignee: Sriharsha Chintalapani

 I just attempted to delete at 128 partition topic with all inbound producers 
 stopped.
 The result was as follows:
 The /admin/delete_topics znode was empty
 the topic under /brokers/topics was removed
 The Kafka topics command showed that the topic was removed
 However, the data on disk on each broker was not deleted and the topic has 
 not yet been re-created by starting up the inbound mirror maker.
 Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KAFKA-2042

2015-03-24 Thread Jiangjie Qin
Hi Jun,

This issue does not only affect flush(). It is just with flush() the
probability is much higher.
It will affect the following scenario:
1. Producer started and refreshed metadata.
2. User call producer.send() to send message 1 to topic A, but message A
is in accumulator.
3. User call producer.send() to send message 2 to topic B (topic B is a
new topic, which does not exist in broker)
4. Message 1 will not get sent out until user try to send message to topic
A again.

If a flush() is called at this point, it will block forever.

Jiangjie (Becket) Qin

On 3/24/15, 3:09 PM, Jun Rao j...@confluent.io wrote:

Hi, Guozhang,

The flush() was added to the new producer in trunk, not in 0.8.2, right?

Thanks,

Jun

On Tue, Mar 24, 2015 at 2:42 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello,

 We found a serious bug while testing flush() calls in the new producer,
 which is summarized in KAFKA-2042.

 In general, when the producer starts up it will try to refresh metadata
 with empty topic list, and hence get all the topic metadata. When
sending
 the message with some topic later, it will hence not cause the topic to
be
 added into metadata's topic list since the metadata is available. When
the
 data is still sitting in the accumulator and a new topic is created,
that
 will cause metadata refresh with just this single topic, hence losing
the
 metadata for any other topics. Under usual scenarios the messages will
be
 sitting in the accumulator until another send() is triggered with the
same
 topic, but with flush() as a blocking call the likelihood of this issue
 being exposed that messages gets blocked forever inside flush() could be
 largely increased.

 I am writing to ask if people think this problem is severe enough that
 requires another bug-fix release.

 -- Guozhang




[jira] [Commented] (KAFKA-2032) ConsumerConfig doesn't validate partition.assignment.strategy values

2015-03-24 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378834#comment-14378834
 ] 

Parth Brahmbhatt commented on KAFKA-2032:
-

Created reviewboard https://reviews.apache.org/r/32460/diff/
 against branch origin/trunk

 ConsumerConfig doesn't validate partition.assignment.strategy values
 

 Key: KAFKA-2032
 URL: https://issues.apache.org/jira/browse/KAFKA-2032
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.2
Reporter: Jason Rosenberg
Assignee: Parth Brahmbhatt
 Attachments: KAFKA-2032.patch, KAFKA-2032.patch, 
 KAFKA-2032_2015-03-19_11:42:07.patch, KAFKA-2032_2015-03-19_11:44:48.patch, 
 KAFKA-2032_2015-03-19_11:47:24.patch, KAFKA-2032_2015-03-19_12:19:45.patch


 In the ConsumerConfig class, there are validation checks to make sure that 
 string based configuration properties conform to allowed values.  However, 
 this validation appears to be missing for the partition.assignment.strategy.  
 E.g. there is validation for autooffset.reset and offsets.storage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32460: Patch for KAFKA-2032

2015-03-24 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32460/
---

Review request for kafka.


Bugs: KAFKA-2032
https://issues.apache.org/jira/browse/KAFKA-2032


Repository: kafka


Description
---

KAFKA:2032 added topic config cache.


Diffs
-

  core/src/main/scala/kafka/server/KafkaServer.scala 
dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/main/scala/kafka/server/TopicConfigCache.scala PRE-CREATION 
  core/src/main/scala/kafka/server/TopicConfigManager.scala 
47295d40131492aaac786273819b7bc6e22e5486 
  core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
93182aeb342729d420d2e7d59a1035994164b7db 
  core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/32460/diff/


Testing
---


Thanks,

Parth Brahmbhatt



[jira] [Updated] (KAFKA-2032) ConsumerConfig doesn't validate partition.assignment.strategy values

2015-03-24 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-2032:

Attachment: KAFKA-2032.patch

 ConsumerConfig doesn't validate partition.assignment.strategy values
 

 Key: KAFKA-2032
 URL: https://issues.apache.org/jira/browse/KAFKA-2032
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.2
Reporter: Jason Rosenberg
Assignee: Parth Brahmbhatt
 Attachments: KAFKA-2032.patch, KAFKA-2032.patch, 
 KAFKA-2032_2015-03-19_11:42:07.patch, KAFKA-2032_2015-03-19_11:44:48.patch, 
 KAFKA-2032_2015-03-19_11:47:24.patch, KAFKA-2032_2015-03-19_12:19:45.patch


 In the ConsumerConfig class, there are validation checks to make sure that 
 string based configuration properties conform to allowed values.  However, 
 this validation appears to be missing for the partition.assignment.strategy.  
 E.g. there is validation for autooffset.reset and offsets.storage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31366: Patch for KAFKA-1461

2015-03-24 Thread Sriharsha Chintalapani


 On March 24, 2015, 10:46 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala, lines 81-86
  https://reviews.apache.org/r/31366/diff/2/?file=898415#file898415line81
 
  Jun has a comment about the case when all partitions gets inactive, 
  which is common when the fetched broker has been just gone through leader 
  migration.
  
  We can move the foreach statement before the if statement, and after 
  foreach check if any partitions gets added, if not just backoff for 
  fetchBackoffMs.

Thanks for the review. Are you looking at something like this. This wouldn't 
handle if we have partitionMap populated but all of them are inactive.

  partitionMap.foreach {
case((topicAndPartition, partitionFetchState)) =
  if(partitionFetchState.isActive)
fetchRequestBuilder.addFetch(topicAndPartition.topic, 
topicAndPartition.partition,
  partitionFetchState.offset, fetchSize)
  }
  if (partitionMap.isEmpty)
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
or do we want to check if all the currentParttions are inactive and than 
backoff? that would be expensive to check if all the partitions or active or 
not in dowork.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31366/#review77674
---


On March 17, 2015, 11:03 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31366/
 ---
 
 (Updated March 17, 2015, 11:03 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1461
 https://issues.apache.org/jira/browse/KAFKA-1461
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1461. Replica fetcher thread does not implement any back-off behavior.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 e731df4b2a3e44aa3d761713a09b1070aff81430 
   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
 96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 
 
 Diff: https://reviews.apache.org/r/31366/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378783#comment-14378783
 ] 

Neha Narkhede commented on KAFKA-2046:
--

[~clarkhaskins] As discussed previously, the minimum amount of information that 
is needed to troubleshoot any issue, not just delete topic is-
1. Controller logs (possibly at TRACE)
2. Server logs
3 State change log (DEBUG works)

 Delete topic still doesn't work
 ---

 Key: KAFKA-2046
 URL: https://issues.apache.org/jira/browse/KAFKA-2046
 Project: Kafka
  Issue Type: Bug
Reporter: Clark Haskins
Assignee: Sriharsha Chintalapani

 I just attempted to delete at 128 partition topic with all inbound producers 
 stopped.
 The result was as follows:
 The /admin/delete_topics znode was empty
 the topic under /brokers/topics was removed
 The Kafka topics command showed that the topic was removed
 However, the data on disk on each broker was not deleted and the topic has 
 not yet been re-created by starting up the inbound mirror maker.
 Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-24 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-2047:
---

 Summary: Accelarate consumer rebalance in mirror maker.
 Key: KAFKA-2047
 URL: https://issues.apache.org/jira/browse/KAFKA-2047
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin


In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
longer because there are more zookeeper consumer connectors doing rebalance 
serially. Rebalance would be faster if the bootstrap of 
ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KAFKA-2042

2015-03-24 Thread Guozhang Wang
Ah that is right, we just need to make sure this ticket goes along with
flush() call in the next release then.

On Tue, Mar 24, 2015 at 3:09 PM, Jun Rao j...@confluent.io wrote:

 Hi, Guozhang,

 The flush() was added to the new producer in trunk, not in 0.8.2, right?

 Thanks,

 Jun

 On Tue, Mar 24, 2015 at 2:42 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hello,
 
  We found a serious bug while testing flush() calls in the new producer,
  which is summarized in KAFKA-2042.
 
  In general, when the producer starts up it will try to refresh metadata
  with empty topic list, and hence get all the topic metadata. When sending
  the message with some topic later, it will hence not cause the topic to
 be
  added into metadata's topic list since the metadata is available. When
 the
  data is still sitting in the accumulator and a new topic is created, that
  will cause metadata refresh with just this single topic, hence losing the
  metadata for any other topics. Under usual scenarios the messages will be
  sitting in the accumulator until another send() is triggered with the
 same
  topic, but with flush() as a blocking call the likelihood of this issue
  being exposed that messages gets blocked forever inside flush() could be
  largely increased.
 
  I am writing to ask if people think this problem is severe enough that
  requires another bug-fix release.
 
  -- Guozhang
 




-- 
-- Guozhang


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378954#comment-14378954
 ] 

Jun Rao commented on KAFKA-2042:


Could you explain a bit more when the new producer will send a TMR with an 
empty topic list? I can see this happen if after the producer is created, no 
message is sent within the window of metadata age. Is that the only case when 
this can happen?

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Fix For: 0.8.3

 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
 KAFKA-2042_2015-03-24_13:57:23.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[CANCEL] [VOTE] KIP-15 add a close method with timeout to KafkaProducer

2015-03-24 Thread Jiangjie Qin


On 3/24/15, 1:34 PM, Guozhang Wang wangg...@gmail.com wrote:

+1, let's just start a new thread for this.

On Tue, Mar 24, 2015 at 12:23 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Actually, since there are already votes on this and the KIP has
 changed a bit we should cancel this and start a new thread.

 On Tue, Mar 24, 2015 at 12:19 PM, Jiangjie Qin
 j...@linkedin.com.invalid wrote:
  Push up the thread for voting after discussion on the KIP hangout.
 
  On 3/19/15, 9:03 PM, Jiangjie Qin j...@linkedin.com wrote:
 
 We had some additional discussions on the discussion thread. Pushing
up
 this thread to resume voting.
 
 On 3/11/15, 8:47 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
 Yeah guys, I'd like to second that. I'd really really love to get the
 quality of these to the point where we could broadly solicit user
input
 and
 use them as a permanent document of the alternatives and rationale.
 
 I know it is a little painful to have process, but I think we all saw
 what
 happened to the previous clients as public interfaces so I really
really
 really want us to just be incredibly thoughtful and disciplined as we
 make
 changes. I think we all want to avoid another client rewrite.
 
 To second Joe's question in a more specific way, I think an
alternative
 I
 don't see considered to give close() a bounded time is just to
enforce
 the
 request time on the client side, which will cause all requests to be
 failed
 after the request timeout expires. This was the same behavior as for
 flush.
 In the case where the user just wants to ensure close doesn't block
 forever
 I think that may be sufficient?
 
 So one alternative might be to just do that request timeout feature
and
 add
 a new producer config that is something like
   abort.on.failure=false
 which causes the producer to hard exit if it can't send a request.
Which
 I
 think is closer to what you want, with this just being a way to
 implement
 that behavior.
 
 I'm not sure if this is better or worse, but we should be sure
before we
 make the change.
 
 I also have a concern about
   producer.close(0, TimeUnit.MILLISECONDS)
 not meaning close with a timeout of 0 ms.
 
 I realize this exists in other java apis, but it is so confusing it
even
 confused us into having that recent producer bug because of course
all
 the
 other numbers mean wait that long.
 
 I'd propose
   close()--block until all completed
   close(0, TimeUnit.MILLISECONDS)--block for 0 ms
   close(5, TimeUnit.MILLISECONDS)--block for 5 ms
   close(-1, TimeUnit.MILLISECONDS)--error because blocking for
negative
 ms
 would mean completing in the past :-)
 
 -Jay
 
 On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein joe.st...@stealth.ly
 wrote:
 
  Could the KIP confluence please have updated the discussion thread
 link,
  thanks... could you also remove the template boilerplate at the top
 *This
  page is meant as a template ..* so we can capture it for the
release
  cleanly.
 
  Also I don't really/fully understand how this is different than
  flush(time); close() and why close has its own timeout also?
 
  Lastly, what is the forceClose flag? This isn't documented in the
 public
  interface so it isn't clear how to completely use the feature just
by
  reading the KIP.
 
  ~ Joe Stein
  - - - - - - - - - - - - - - - - -
 
http://www.stealth.ly
  - - - - - - - - - - - - - - - - -
 
  On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang
wangg...@gmail.com
  wrote:
 
   +1 (binding)
  
   On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
  
   
   
  
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m
 e
 thod+with+a+timeout+in+the+producer
   
   
  
  
   --
   -- Guozhang
  
 
 
 




-- 
-- Guozhang



[jira] [Created] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Clark Haskins (JIRA)
Clark Haskins created KAFKA-2046:


 Summary: Delete topic still doesn't work
 Key: KAFKA-2046
 URL: https://issues.apache.org/jira/browse/KAFKA-2046
 Project: Kafka
  Issue Type: Bug
Reporter: Clark Haskins


I just attempted to delete at 128 partition topic with all inbound producers 
stopped.

The result was as follows:
The /admin/delete_topics znode was empty
the topic under /brokers/topics was removed
The Kafka topics command showed that the topic was removed

However, the data on disk on each broker was not deleted and the topic has not 
yet been re-created by starting up the inbound mirror maker.

Let me know what additional information is needed





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1982) change kafka.examples.Producer to use the new java producer

2015-03-24 Thread Pete Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378734#comment-14378734
 ] 

Pete Wright commented on KAFKA-1982:


+1 on committing this as it will quicken adoption of the new producer across 
our organization while also allowing us to track upstream releases with out 
patches.

 change kafka.examples.Producer to use the new java producer
 ---

 Key: KAFKA-1982
 URL: https://issues.apache.org/jira/browse/KAFKA-1982
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.3
Reporter: Jun Rao
Assignee: Ashish K Singh
  Labels: newbie
 Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, 
 KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, 
 KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch


 We need to change the example to use the new java producer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KAFKA-2042

2015-03-24 Thread Jun Rao
Hi, Guozhang,

The flush() was added to the new producer in trunk, not in 0.8.2, right?

Thanks,

Jun

On Tue, Mar 24, 2015 at 2:42 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello,

 We found a serious bug while testing flush() calls in the new producer,
 which is summarized in KAFKA-2042.

 In general, when the producer starts up it will try to refresh metadata
 with empty topic list, and hence get all the topic metadata. When sending
 the message with some topic later, it will hence not cause the topic to be
 added into metadata's topic list since the metadata is available. When the
 data is still sitting in the accumulator and a new topic is created, that
 will cause metadata refresh with just this single topic, hence losing the
 metadata for any other topics. Under usual scenarios the messages will be
 sitting in the accumulator until another send() is triggered with the same
 topic, but with flush() as a blocking call the likelihood of this issue
 being exposed that messages gets blocked forever inside flush() could be
 largely increased.

 I am writing to ask if people think this problem is severe enough that
 requires another bug-fix release.

 -- Guozhang



[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378944#comment-14378944
 ] 

Onur Karaman commented on KAFKA-2046:
-

I just tried deleting a topic with 128 partitions and noticed that the delete 
topic node and the topic node were still in zk, and all but one replica on the 
brokers had not been deleted.

grep handling stop replica (delete=false) kafka-state-change.log produced 
output for all of the partitions. So the controller was able to send a 
StopReplicaRequest to the brokers to transition from OnlineReplica to 
OfflineReplica.

However, grep handling stop replica (delete=true) kafka-state-change.log only 
revealed only one replica. This was the replica that I noticed had actually 
been deleted from the filesystem. The other replicas never received the 
delete=true StopReplicaRequest. So the transition from OfflineReplica to 
ReplicaDeletionStarted for all the other replicas hang. A thread dump on the 
controller indicates that it's getting stuck because of a LinkedBlockingQueue 
in ControllerChannelManager:
{code}
delete-topics-thread-xyz...
  java.lang.Thread.State: WAITING (parking)
  ...
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
at 
kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
...
at 
kafka.controller.KafkaController.sendRequest(KafkaController.scala:670)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$10.apply(ControllerChannelManager.scala:320)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$10.apply(ControllerChannelManager.scala:317)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:317)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:310)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:310)
at 
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:115)
at 
kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:337)
at 
kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:327)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at 
kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:327)
{code}
controller.message.queue.size property is currently set to be very small. I'll 
try bumping this up and see if this addresses the issue.

 Delete topic still doesn't work
 ---

 Key: KAFKA-2046
 URL: https://issues.apache.org/jira/browse/KAFKA-2046
 Project: Kafka
  Issue Type: Bug
Reporter: Clark Haskins
Assignee: Onur Karaman

 I just attempted to delete at 128 partition topic with all inbound producers 
 stopped.
 The result was as follows:
 The /admin/delete_topics znode was empty
 the topic under /brokers/topics was removed
 The Kafka topics command showed that the topic was removed
 However, the data on disk on each broker was not deleted and the topic has 
 not yet been re-created by starting up the inbound mirror maker.
 Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-24 Thread Grant Henke (JIRA)
Grant Henke created KAFKA-2043:
--

 Summary: CompressionType is passed in each RecordAccumulator append
 Key: KAFKA-2043
 URL: https://issues.apache.org/jira/browse/KAFKA-2043
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.0
Reporter: Grant Henke
Assignee: Grant Henke
Priority: Minor


Currently org.apache.kafka.clients.producer.internals.RecordAccumulator append 
method accepts the compressionType on a per record basis. It looks like the 
code would only work on a per batch basis because the CompressionType is only 
used when creating a new RecordBatch. My understanding is this should only 
support setting per batch at most. 

public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
value, CompressionType compression, Callback callback) throws 
InterruptedException;

The compression type is a producer
level config. Instead of passing it in for each append, we probably should
just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32440: Patch for KAFKA-2043

2015-03-24 Thread Grant Henke

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/
---

Review request for kafka.


Bugs: KAFKA-2043
https://issues.apache.org/jira/browse/KAFKA-2043


Repository: kafka


Description
---

CompressionType is passed in each RecordAccumulator append


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 24274a64885fadd0e9318de2beb232218ddd52cd 

Diff: https://reviews.apache.org/r/32440/diff/


Testing
---


Thanks,

Grant Henke



[jira] [Commented] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-24 Thread Grant Henke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378046#comment-14378046
 ] 

Grant Henke commented on KAFKA-2043:


Created reviewboard https://reviews.apache.org/r/32440/diff/
 against branch origin/trunk

 CompressionType is passed in each RecordAccumulator append
 --

 Key: KAFKA-2043
 URL: https://issues.apache.org/jira/browse/KAFKA-2043
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.0
Reporter: Grant Henke
Assignee: Grant Henke
Priority: Minor
 Attachments: KAFKA-2043.patch


 Currently org.apache.kafka.clients.producer.internals.RecordAccumulator 
 append method accepts the compressionType on a per record basis. It looks 
 like the code would only work on a per batch basis because the 
 CompressionType is only used when creating a new RecordBatch. My 
 understanding is this should only support setting per batch at most. 
 public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
 value, CompressionType compression, Callback callback) throws 
 InterruptedException;
 The compression type is a producer
 level config. Instead of passing it in for each append, we probably should
 just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-24 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-2043:
---
Status: Patch Available  (was: Open)

 CompressionType is passed in each RecordAccumulator append
 --

 Key: KAFKA-2043
 URL: https://issues.apache.org/jira/browse/KAFKA-2043
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.0
Reporter: Grant Henke
Assignee: Grant Henke
Priority: Minor
 Attachments: KAFKA-2043.patch


 Currently org.apache.kafka.clients.producer.internals.RecordAccumulator 
 append method accepts the compressionType on a per record basis. It looks 
 like the code would only work on a per batch basis because the 
 CompressionType is only used when creating a new RecordBatch. My 
 understanding is this should only support setting per batch at most. 
 public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
 value, CompressionType compression, Callback callback) throws 
 InterruptedException;
 The compression type is a producer
 level config. Instead of passing it in for each append, we probably should
 just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-24 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-2043:
---
Attachment: KAFKA-2043.patch

 CompressionType is passed in each RecordAccumulator append
 --

 Key: KAFKA-2043
 URL: https://issues.apache.org/jira/browse/KAFKA-2043
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.0
Reporter: Grant Henke
Assignee: Grant Henke
Priority: Minor
 Attachments: KAFKA-2043.patch


 Currently org.apache.kafka.clients.producer.internals.RecordAccumulator 
 append method accepts the compressionType on a per record basis. It looks 
 like the code would only work on a per batch basis because the 
 CompressionType is only used when creating a new RecordBatch. My 
 understanding is this should only support setting per batch at most. 
 public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
 value, CompressionType compression, Callback callback) throws 
 InterruptedException;
 The compression type is a producer
 level config. Instead of passing it in for each append, we probably should
 just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: New Producer Questions/Feedback

2015-03-24 Thread Grant Henke
Here is the jira: https://issues.apache.org/jira/browse/KAFKA-2043

Thanks,
Grant

On Mon, Mar 23, 2015 at 11:53 PM, Jun Rao j...@confluent.io wrote:

 RecordAccumulator is actually not part of the public api since it's
 internal. The public apis are only those in

 http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

 Thanks,

 Jun

 On Mon, Mar 23, 2015 at 9:23 PM, Grant Henke ghe...@cloudera.com wrote:

  Thanks for validating that. I was thinking of solving it in the same
  fashion. Though I was unsure if there was/would be a use case to have
  multiple CompressionTypes in the same RecordAccumulator since the API was
  originally created this way.
 
  I would be happy to file a jira and can take on making the change too.
  Since
  RecordAccumulator is part of the public api, should the KIP process be
  followed here as well?
 
  On Mon, Mar 23, 2015 at 10:58 PM, Jun Rao j...@confluent.io wrote:
 
   Hi, Grant,
  
   The append api seems indeed a bit weird. The compression type is a
  producer
   level config. Instead of passing it in for each append, we probably
  should
   just pass it in once during the creation RecordAccumulator. Could you
  file
   a jira to track this?
  
   Thanks,
  
   Jun
  
   On Mon, Mar 23, 2015 at 7:16 PM, Grant Henke ghe...@cloudera.com
  wrote:
  
I am reading over the new producer code in an effort to understand
 the
implementation more thoroughly and had some questions/feedback.
   
Currently
 org.apache.kafka.clients.producer.internals.RecordAccumulator
append method accepts the compressionType on a per record basis. It
  looks
like the code would only work on a per batch basis because the
CompressionType is only used when creating a new RecordBatch. My
understanding is this should only support setting per batch at most.
 I
   may
have misread this though. Is there a time where setting per record
  would
make sense?
   
public RecordAppendResult append(TopicPartition tp, byte[] key,
   byte[]
value, CompressionType compression, Callback callback) throws
InterruptedException;
   
Why does org.apache.kafka.common.serialization.Serializer Interface
   require
a topic?  Is there a use case where serialization would change based
 on
topic?
   
   public byte[] serialize(String topic, T data);
   
Thank you,
Grant
   
--
Grant Henke
Solutions Consultant | Cloudera
ghe...@cloudera.com | 920-980-8979
twitter.com/ghenke http://twitter.com/gchenke |
linkedin.com/in/granthenke
   
  
 
 
 
  --
  Grant Henke
  Solutions Consultant | Cloudera
  ghe...@cloudera.com | 920-980-8979
  twitter.com/ghenke http://twitter.com/gchenke |
  linkedin.com/in/granthenke
 




-- 
Grant Henke
Solutions Consultant | Cloudera
ghe...@cloudera.com | 920-980-8979
twitter.com/ghenke http://twitter.com/gchenke | linkedin.com/in/granthenke


[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-24 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378064#comment-14378064
 ] 

Parth Brahmbhatt commented on KAFKA-1688:
-

[~prasadm] Thanks for taking the time to review. 
* Personally, I don't like super user concept primarily because even though it 
provides convenience it also increases the blast radius of the entire system. 
If a user's credentials are compromised in the current design only topics and 
actions he can perform on a cluster are compromised. However I think its fair 
to provide this feature so the users can make that choice on their own. I will 
update the KIP to reflect this as part of the proposal and let others vote for 
it.
The user's won't have to grant permissions on non existing topics in absence of 
the super user concept. However they will have to supply permissions during 
topic creation and they will be allowed to alter these ACLs via alter topic 
command line tool.
* I can add ALL as an item to the Operation enum. For now, like most other 
permissions it will only be applicable to topics.

 Add authorization interface and naive implementation
 

 Key: KAFKA-1688
 URL: https://issues.apache.org/jira/browse/KAFKA-1688
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Parth Brahmbhatt
 Fix For: 0.8.3


 Add a PermissionManager interface as described here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security
 (possibly there is a better name?)
 Implement calls to the PermissionsManager in KafkaApis for the main requests 
 (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
 exception to the protocol to indicate permission denied.
 Add a server configuration to give the class you want to instantiate that 
 implements that interface. That class can define its own configuration 
 properties from the main config file.
 Provide a simple implementation of this interface which just takes a user and 
 ip whitelist and permits those in either of the whitelists to do anything, 
 and denies all others.
 Rather than writing an integration test for this class we can probably just 
 use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Attachment: KAFKA-2048.patch

 java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
 handling error returned from simpleConsumer
 ---

 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede
 Attachments: KAFKA-2048.patch


 AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
 the catch block of processFetchRequest method. This is because 
 partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2049) Add thread that detects JVM pauses

2015-03-24 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-2049:
---

 Summary: Add thread that detects JVM pauses
 Key: KAFKA-2049
 URL: https://issues.apache.org/jira/browse/KAFKA-2049
 Project: Kafka
  Issue Type: Improvement
Reporter: Gwen Shapira
Assignee: Gwen Shapira


Long JVM pauses can cause Kafka malfunctions (especially when interacting with 
ZK) that can be challenging to debug.

I propose implementing HADOOP-9618 in Kafka:
Add a simple thread which loops on 1-second sleeps, and if the sleep ever takes 
significantly longer than 1 second, log a WARN. This will make GC pauses (and 
other pauses) obvious in logs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Status: Patch Available  (was: Open)

 java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
 handling error returned from simpleConsumer
 ---

 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede
 Attachments: KAFKA-2048.patch


 AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
 the catch block of processFetchRequest method. This is because 
 partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Attachment: KAFKA-2048.patch

 java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
 handling error returned from simpleConsumer
 ---

 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede
 Attachments: KAFKA-2048.patch


 AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
 the catch block of processFetchRequest method. This is because 
 partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1293) Mirror maker housecleaning

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379033#comment-14379033
 ] 

Jiangjie Qin commented on KAFKA-1293:
-

[~mwarhaftig] It looks very closely related to the work in KIP-14. Please feel 
free to own that KIP if you want to.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-14+-+Tools+Standardization
Since this is a public interface change, we need to go through the KIP process. 
You can find the KIP process here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

 Mirror maker housecleaning
 --

 Key: KAFKA-1293
 URL: https://issues.apache.org/jira/browse/KAFKA-1293
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1
Reporter: Jay Kreps
Priority: Minor
  Labels: usability
 Attachments: KAFKA-1293.patch


 Mirror maker uses it's own convention for command-line arguments, e.g. 
 --num.producers, where everywhere else follows the unix convention like 
 --num-producers. This is annoying because when running different tools you 
 have to constantly remember whatever quirks of the person who wrote that tool.
 Mirror maker should also have a top-level wrapper script in bin/ to make tab 
 completion work and so you don't have to remember the fully qualified class 
 name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Status: Patch Available  (was: Open)

 java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
 handling error returned from simpleConsumer
 ---

 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede
 Attachments: KAFKA-2048.patch


 AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
 the catch block of processFetchRequest method. This is because 
 partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379175#comment-14379175
 ] 

Jiangjie Qin commented on KAFKA-2029:
-

I see. Yes, there is value to put per event barriers on the controller to 
achieve better synchronization among brokers. It looks orthogonal to the the 
efforts on broker side. Can you maybe  upload the patch using 
kafka-patch-review.py so we have a better view on the changes? Thanks.

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical

 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in controller-to-broker queue). After broker starts back it receives 
 followers request and erases highwatermarks (with a message that replica 
 does not exists - controller hadn't yet sent a request with replica 
 assignment), then controller starts replicas on the broker it deletes all 
 local data (due to missing highwatermarks). Furthermore, controller starts 
 processing pending shutdown request and stops replicas on the broker letting 
 it in a non-functional state. Solution to the problem might be to increase 
 time broker waits for controller reponse to shutdown request, but this 
 timeout is taken from controller.socket.timeout.ms which is global for all 
 broker-controller communication and setting it to 30 minutes is dangerous. 
 *Proposed solution: introduce dedicated config parameter for this timeout 
 with a high default*.
 # If a broker gets down during controlled shutdown and did not come back 
 controller got stuck in a deadlock (one thread owns the lock and tries to add 
 message to the dead broker's queue, send thread is a infinite loop trying to 
 retry message to the dead broker, and the broker failure handler is waiting 
 for a lock). There are numerous partitions without a leader and the only way 
 out is to kill -9 the controller. *Proposed solution: add timeout for adding 
 message to broker's queue*. ControllerChannelManager.sendRequest:
 {code}
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
 (RequestOrResponse) = Unit = null) {
 brokerLock synchronized {
   val stateInfoOpt = brokerStateInfo.get(brokerId)
   stateInfoOpt match {
 case Some(stateInfo) =
   // ODKL Patch: prevent infinite hang on trying to send message to a 
 dead broker.
   // TODO: Move timeout to config
   if (!stateInfo.messageQueue.offer((request, callback), 10, 
 TimeUnit.SECONDS)) {
 error(Timed out trying to send message to broker  + 
 brokerId.toString)
 // Do not throw, as it brings controller into completely 
 non-functional state
 // Controller to broker state change requests batch is not empty 
 while creating a new one
 //throw new IllegalStateException(Timed out trying to send 
 message to broker  + brokerId.toString)
   }
 case None =
   warn(Not sending request %s to broker %d, since it is 
 offline..format(request, brokerId))
   }
 }
   }
 {code}
 # When broker which is a controler starts shut down if auto leader rebalance 
 is running it deadlocks in the end (shutdown thread owns the lock and waits 
 for rebalance thread to exit and rebalance thread wait for lock). *Proposed 
 solution: use bounded wait in rebalance thread*. KafkaController.scala:
 {code}
   // ODKL Patch to prevent deadlocks in shutdown.
   /**
* Execute the given function inside the lock
*/
   def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = 

[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Attachment: (was: KAFKA-2048.patch)

 java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
 handling error returned from simpleConsumer
 ---

 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede
 Attachments: KAFKA-2048.patch


 AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
 the catch block of processFetchRequest method. This is because 
 partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2048:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

 java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
 handling error returned from simpleConsumer
 ---

 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede
 Attachments: KAFKA-2048.patch


 AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
 the catch block of processFetchRequest method. This is because 
 partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379250#comment-14379250
 ] 

Guozhang Wang commented on KAFKA-2048:
--

Thanks for the patch [~xiaotao183], +1 and committed to trunk.

 java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
 handling error returned from simpleConsumer
 ---

 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede
 Attachments: KAFKA-2048.patch


 AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
 the catch block of processFetchRequest method. This is because 
 partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2042:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Fix For: 0.8.3

 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
 KAFKA-2042_2015-03-24_13:57:23.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378986#comment-14378986
 ] 

Jiangjie Qin commented on KAFKA-2042:
-

Yes, it depends on whether the topic list is empty or not when we send the 
first TMR.
I might miss something but I think the TMR will be sent very soon after the 
producer is instantiated. 
In the first NetworkClient.poll(), it checks if metadata needs update by 
getting the max of
timeToNextMeatadataUpdate
timeToNextReconnectAttempt
waitForMetadataFetch
All of them will be 0 on starting up. That means the TMR will be sent at the 
first poll().


 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Fix For: 0.8.3

 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
 KAFKA-2042_2015-03-24_13:57:23.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-03-24 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379162#comment-14379162
 ] 

Sriharsha Chintalapani commented on KAFKA-1461:
---

[~guozhang] Thanks for the review. Can you please take a look at my reply to 
your comment.

 Replica fetcher thread does not implement any back-off behavior
 ---

 Key: KAFKA-1461
 URL: https://issues.apache.org/jira/browse/KAFKA-1461
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.1.1
Reporter: Sam Meder
Assignee: Sriharsha Chintalapani
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1461.patch, KAFKA-1461.patch, 
 KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch, 
 KAFKA-1461_2015-03-12_13:54:51.patch, KAFKA-1461_2015-03-17_16:03:33.patch


 The current replica fetcher thread will retry in a tight loop if any error 
 occurs during the fetch call. For example, we've seen cases where the fetch 
 continuously throws a connection refused exception leading to several replica 
 fetcher threads that spin in a pretty tight loop.
 To a much lesser degree this is also an issue in the consumer fetcher thread, 
 although the fact that erroring partitions are removed so a leader can be 
 re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] KIP-17 - Add HighwaterMarkOffset to OffsetFetchResponse

2015-03-24 Thread Grant Henke
Here is an initial proposal to add HighwaterMarkOffset to the
OffsetFetchResponse:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-17+-+Add+HighwaterMarkOffset+to+OffsetFetchResponse

I can add a jira and more implementation details if the
initial proposal has interest.

Thanks,
Grant
-- 
Grant Henke
Solutions Consultant | Cloudera
ghe...@cloudera.com | 920-980-8979
twitter.com/ghenke http://twitter.com/gchenke | linkedin.com/in/granthenke


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379015#comment-14379015
 ] 

Jun Rao commented on KAFKA-2042:


If I start a console-producer w/o typing in any message, the producer actually 
doesn't send any metadata request immediately. On initializing the producer, we 
update metadata with the bootstrap broker. This sets lastRefreshMs to the 
current time. So, in NetworkClient.poll(), timeToNextMeatadataUpdate will 
actually be the metadata age, which defaults to 300 secs.

In what situation did you discover this problem?

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Fix For: 0.8.3

 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
 KAFKA-2042_2015-03-24_13:57:23.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379120#comment-14379120
 ] 

Jiangjie Qin commented on KAFKA-2042:
-

Ah, yes, that's right. I found this issue when starting mirror maker. Because a 
large mirror maker cluster might take some time to finish consumer rebalance. 
So that's why no producer.send() was called before the first TMR was sent. So 
this issue probably will not cause issue in normal cases under default settings.

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Fix For: 0.8.3

 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
 KAFKA-2042_2015-03-24_13:57:23.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Status: Open  (was: Patch Available)

 java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
 handling error returned from simpleConsumer
 ---

 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede

 AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
 the catch block of processFetchRequest method. This is because 
 partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Attachment: (was: KAFKA-2048.patch)

 java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
 handling error returned from simpleConsumer
 ---

 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede

 AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
 the catch block of processFetchRequest method. This is because 
 partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: KafkaPreCommit #43

2015-03-24 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/43/changes

Changes:

[wangguoz] KAFKA-2048; Change lock synchronized to inLock() for 
partitionMapCond; reviewed by Guozhang Wang

--
[...truncated 1194 lines...]
kafka.admin.DeleteTopicTest  testDeleteNonExistingTopic PASSED

kafka.admin.DeleteTopicTest  testDeleteTopicWithCleaner PASSED

kafka.api.ProducerSendTest  testSendOffset PASSED

kafka.api.ProducerSendTest  testSerializer PASSED

kafka.api.ProducerSendTest  testClose PASSED

kafka.api.ProducerSendTest  testSendToPartition PASSED

kafka.api.ProducerSendTest  testAutoCreateTopic PASSED

kafka.api.ProducerSendTest  testFlush PASSED

kafka.api.ProducerFailureHandlingTest  testNotEnoughReplicas PASSED

kafka.api.ProducerFailureHandlingTest  testInvalidPartition PASSED

kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckZero PASSED

kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne PASSED

kafka.api.ProducerFailureHandlingTest  testNonExistentTopic PASSED

kafka.api.ProducerFailureHandlingTest  testWrongBrokerList PASSED

kafka.api.ProducerFailureHandlingTest  testNoResponse PASSED

kafka.api.ProducerFailureHandlingTest  testSendAfterClosed PASSED

kafka.api.ProducerFailureHandlingTest  testBrokerFailure PASSED

kafka.api.ProducerFailureHandlingTest  testCannotSendToInternalTopic PASSED

kafka.api.ProducerFailureHandlingTest  
testNotEnoughReplicasAfterBrokerShutdown PASSED

kafka.api.ConsumerTest  testSimpleConsumption PASSED

kafka.api.ConsumerTest  testAutoOffsetReset PASSED

kafka.api.ConsumerTest  testSeek PASSED

kafka.api.ConsumerTest  testGroupConsumption PASSED

kafka.api.ConsumerTest  testPositionAndCommit PASSED

kafka.api.ConsumerTest  testPartitionsFor PASSED

kafka.api.ConsumerTest  testConsumptionWithBrokerFailures PASSED

kafka.api.ConsumerTest  testSeekAndCommitWithBrokerFailures FAILED
java.lang.AssertionError: expected:1000 but was:492
at org.junit.Assert.fail(Assert.java:92)
at org.junit.Assert.failNotEquals(Assert.java:689)
at org.junit.Assert.assertEquals(Assert.java:127)
at org.junit.Assert.assertEquals(Assert.java:514)
at org.junit.Assert.assertEquals(Assert.java:498)
at 
kafka.api.ConsumerTest.seekAndCommitWithBrokerFailures(ConsumerTest.scala:201)
at 
kafka.api.ConsumerTest.testSeekAndCommitWithBrokerFailures(ConsumerTest.scala:182)

kafka.api.ConsumerTest  testPartitionReassignmentCallback PASSED

kafka.api.RequestResponseSerializationTest  
testSerializationAndDeserialization PASSED

kafka.api.ApiUtilsTest  testShortStringNonASCII PASSED

kafka.api.ApiUtilsTest  testShortStringASCII PASSED

kafka.api.test.ProducerCompressionTest  testCompression[0] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[1] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[2] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[3] PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest  testBasic PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testSizeInBytes PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testEqualsWithCompression 
PASSED

kafka.integration.UncleanLeaderElectionTest  testUncleanLeaderElectionEnabled 
PASSED

kafka.integration.UncleanLeaderElectionTest  testUncleanLeaderElectionDisabled 
PASSED

kafka.integration.UncleanLeaderElectionTest  
testUncleanLeaderElectionEnabledByTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest  
testCleanLeaderElectionDisabledByTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest  
testUncleanLeaderElectionInvalidTopicOverride PASSED

kafka.integration.FetcherTest  testFetcher PASSED

kafka.integration.RollingBounceTest  testRollingBounce PASSED

kafka.integration.PrimitiveApiTest  testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest  testEmptyFetchRequest PASSED

kafka.integration.PrimitiveApiTest  testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest  
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest  testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest  testMultiProduce PASSED

kafka.integration.PrimitiveApiTest  testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest  testPipelinedProduceRequests PASSED

kafka.integration.AutoOffsetResetTest  testResetToEarliestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest  testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integration.AutoOffsetResetTest  

Re: MirrorMaker improvements

2015-03-24 Thread Guozhang Wang
Thanks for sharing this Vlad, this is great read!

I am particularly interested about the last bullet point of one-to-one
mapping in MM since you also mentioned that you use Kafka MM as the async
replication layer for your geo-replicated k-v store. One approach that we
are pursuing here to support active-active is to use a aggregate cluster
that mirror from multiple local clusters from different data centers. But
this approach disables the one-to-one mapping since it requires multiple
sources to pipe to a single destination. How did you tackle this problem at
Turn if you are also using active-active?

Guozhang

On Tue, Mar 24, 2015 at 12:18 PM, vlad...@gmail.com vlad...@gmail.com
wrote:

 Dear all,

 I had a short discussion with Jay yesterday at the ACM meetup and he
 suggested writing an email regarding a few possible MirrorMaker
 improvements.

 At Turn, we have been using MirrorMaker for a a few months now to
 asynchronously replicate our key/value store data between our datacenters.
 In a way, our system is similar to Linkedin's Databus, but it uses Kafka
 clusters and MirrorMaker as its building blocks. Our overall message rate
 peaks at about 650K/sec and, when pushing data over high bandwidth delay
 product links, we have found some minor bottlenecks.

 The MirrorMaker process uses a standard consumer to pull data from a remote
 datacenter. This implies that it opens a single TCP connection to each of
 the remote brokers and muxes requests for different topics and partitions
 over this connection. While this is a good thing in terms of maintaining
 the congestion window open, over long RTT lines with rather high loss rate
 the congestion window will cap, in our case at just a few Mbps. While the
 overall line bandwidth is much higher, this means that we have to start
 multiple MirrorMaker processes (somewhere in the hundreds), in order to
 completely use the line capacity. Being able to pool multiple TCP
 connections from a single consumer to a broker would solve this
 complication.

 The standard consumer also uses the remote ZooKeeper in order to manage the
 consumer group. While consumer group management is moving closer to the
 brokers, it might make sense to move the group management to the local
 datacenter, since that would avoid using the long-distance connection for
 this purpose.

 Another possible improvement assumes a further constraint, namely that the
 number of partitions for a topic in both datacenters is the same. In my
 opinion, this is a sane constraint, since it preserves the Kafka ordering
 guarantees (per partition), instead of a simple guarantee per key. This
 kind of guarantee can be for example useful in a system that compares
 partition contents to reach eventual consistency using Merkle trees. If the
 number of partitions is equal, then offsets have the same meaning for the
 same partition in both clusters, since the data for both partitions is
 identical before the offset. This allows a simple consumer to just inquire
 the local broker and the remote broker for their current offsets and, in
 case the remote broker is ahead, copy the extra data to the local cluster.
 Since the consumer offsets are no longer bound to the specific partitioning
 of a single remote cluster, the consumer could pull from one of any number
 of remote clusters, BitTorrent-style, if their offsets are ahead of the
 local offset. The group management problem would reduce to assigning local
 partitions to different MirrorMaker processes, so the group management
 could be done locally also in this situation.

 Regards,
 Vlad

 PS: Sorry if this is a double posting! The original posting did not appear
 in the archives for a while.




-- 
-- Guozhang


[jira] [Commented] (KAFKA-1856) Add PreCommit Patch Testing

2015-03-24 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379331#comment-14379331
 ] 

Joe Stein commented on KAFKA-1856:
--

Testing file 
[KAFKA-1856_2015-02-04_15%3A44%3A47.patch|https://issues.apache.org/jira/secure/attachment/12696611/KAFKA-1856_2015-02-04_15%3A44%3A47.patch]
 against branch trunk took 0:25:20.725178.

{color:red}Overall:{color} -1 due to 2 errors

{color:red}ERROR:{color} Some unit tests failed (report)
{color:red}ERROR:{color} Failed unit test: 
{{unit.kafka.consumer.PartitionAssignorTest  testRangePartitionAssignor FAILED
}}
{color:green}SUCCESS:{color} Gradle bootstrap was successful
{color:green}SUCCESS:{color} Clean was successful
{color:green}SUCCESS:{color} Patch applied correctly
{color:green}SUCCESS:{color} Patch add/modify test case
{color:green}SUCCESS:{color} Gradle bootstrap was successful
{color:green}SUCCESS:{color} Patch compiled
{color:green}SUCCESS:{color} Checked style for Main
{color:green}SUCCESS:{color} Checked style for Test

This message is automatically generated.

 Add PreCommit Patch Testing
 ---

 Key: KAFKA-1856
 URL: https://issues.apache.org/jira/browse/KAFKA-1856
 Project: Kafka
  Issue Type: Task
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Attachments: KAFKA-1845.result.txt, KAFKA-1856.patch, 
 KAFKA-1856_2015-01-18_21:43:56.patch, KAFKA-1856_2015-02-04_14:57:05.patch, 
 KAFKA-1856_2015-02-04_15:44:47.patch


 h1. Kafka PreCommit Patch Testing - *Don't wait for it to break*
 h2. Motivation
 *With great power comes great responsibility* - Uncle Ben. As Kafka user list 
 is growing, mechanism to ensure quality of the product is required. Quality 
 becomes hard to measure and maintain in an open source project, because of a 
 wide community of contributors. Luckily, Kafka is not the first open source 
 project and can benefit from learnings of prior projects.
 PreCommit tests are the tests that are run for each patch that gets attached 
 to an open JIRA. Based on tests results, test execution framework, test bot, 
 +1 or -1 the patch. Having PreCommit tests take the load off committers to 
 look at or test each patch.
 h2. Tests in Kafka
 h3. Unit and Integraiton Tests
 [Unit and Integration 
 tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Unit+and+Integration+Tests]
  are cardinal to help contributors to avoid breaking existing functionalities 
 while adding new functionalities or fixing older ones. These tests, atleast 
 the ones relevant to the changes, must be run by contributors before 
 attaching a patch to a JIRA.
 h3. System Tests
 [System 
 tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests] 
 are much wider tests that, unlike unit tests, focus on end-to-end scenarios 
 and not some specific method or class.
 h2. Apache PreCommit tests
 Apache provides a mechanism to automatically build a project and run a series 
 of tests whenever a patch is uploaded to a JIRA. Based on test execution, the 
 test framework will comment with a +1 or -1 on the JIRA.
 You can read more about the framework here:
 http://wiki.apache.org/general/PreCommitBuilds
 h2. Plan
 # Create a test-patch.py script (similar to the one used in Flume, Sqoop and 
 other projects) that will take a jira as a parameter, apply on the 
 appropriate branch, build the project, run tests and report results. This 
 script should be committed into the Kafka code-base. To begin with, this will 
 only run unit tests. We can add code sanity checks, system_tests, etc in the 
 future.
 # Create a jenkins job for running the test (as described in 
 http://wiki.apache.org/general/PreCommitBuilds) and validate that it works 
 manually. This must be done by a committer with Jenkins access.
 # Ask someone with access to https://builds.apache.org/job/PreCommit-Admin/ 
 to add Kafka to the list of projects PreCommit-Admin triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1973) Remove the accidentally created LogCleanerManager.scala.orig

2015-03-24 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-1973:
--

Assignee: Grant Henke

 Remove the accidentally created LogCleanerManager.scala.orig
 

 Key: KAFKA-1973
 URL: https://issues.apache.org/jira/browse/KAFKA-1973
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Grant Henke
 Attachments: KAFKA-1973.patch


 It seems there is a LogCleanerManager.scala.orig in the trunk now. Need to 
 remove it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-24 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-2044:
---

 Summary: Support requests and responses from o.a.k.common in 
KafkaApis
 Key: KAFKA-2044
 URL: https://issues.apache.org/jira/browse/KAFKA-2044
 Project: Kafka
  Issue Type: Improvement
Reporter: Gwen Shapira
Assignee: Gwen Shapira


As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to support 
handling of requests and responses from o.a.k.common in KafkaApis.

This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-24 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1927:

Attachment: KAFKA-1927.patch

 Replace requests in kafka.api with requests in 
 org.apache.kafka.common.requests
 ---

 Key: KAFKA-1927
 URL: https://issues.apache.org/jira/browse/KAFKA-1927
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1927.patch


 The common package introduced a better way of defining requests using a new 
 protocol definition DSL and also includes wrapper objects for these.
 We should switch KafkaApis over to use these request definitions and consider 
 the scala classes deprecated (we probably need to retain some of them for a 
 while for the scala clients).
 This will be a big improvement because
 1. We will have each request now defined in only one place (Protocol.java)
 2. We will have built-in support for multi-version requests
 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32448: Patch for KAFKA-1927

2015-03-24 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32448/
---

Review request for kafka.


Bugs: KAFKA-1927
https://issues.apache.org/jira/browse/KAFKA-1927


Repository: kafka


Description
---

support requests and responses using Common api in core modules (missing files)


Diffs
-

  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
9a71faae3138af1b4fb48125db619ddc3ad13c5a 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
3651e8603dd0ed0d2ea059786c68cf0722aa094b 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
55ecac285e00abf38d7131368bb46b4c4010dc87 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 

Diff: https://reviews.apache.org/r/32448/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Commented] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-24 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378231#comment-14378231
 ] 

Gwen Shapira commented on KAFKA-1927:
-

Created reviewboard https://reviews.apache.org/r/32448/diff/
 against branch trunk

 Replace requests in kafka.api with requests in 
 org.apache.kafka.common.requests
 ---

 Key: KAFKA-1927
 URL: https://issues.apache.org/jira/browse/KAFKA-1927
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1927.patch


 The common package introduced a better way of defining requests using a new 
 protocol definition DSL and also includes wrapper objects for these.
 We should switch KafkaApis over to use these request definitions and consider 
 the scala classes deprecated (we probably need to retain some of them for a 
 while for the scala clients).
 This will be a big improvement because
 1. We will have each request now defined in only one place (Protocol.java)
 2. We will have built-in support for multi-version requests
 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication

2015-03-24 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378335#comment-14378335
 ] 

Sriharsha Chintalapani commented on KAFKA-1684:
---

[~gwenshap] if you have patch available for KAFKA-1928 can you please upload 
it. I can modify my ssl and kerberos patches according to the new code. 

 Implement TLS/SSL authentication
 

 Key: KAFKA-1684
 URL: https://issues.apache.org/jira/browse/KAFKA-1684
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
 Attachments: KAFKA-1684.patch, KAFKA-1684.patch


 Add an SSL port to the configuration and advertise this as part of the 
 metadata request.
 If the SSL port is configured the socket server will need to add a second 
 Acceptor thread to listen on it. Connections accepted on this port will need 
 to go through the SSL handshake prior to being registered with a Processor 
 for request processing.
 SSL requests and responses may need to be wrapped or unwrapped using the 
 SSLEngine that was initialized by the acceptor. This wrapping and unwrapping 
 is very similar to what will need to be done for SASL-based authentication 
 schemes. We should have a uniform interface that covers both of these and we 
 will need to store the instance in the session with the request. The socket 
 server will have to use this object when reading and writing requests. We 
 will need to take care with the FetchRequests as the current 
 FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we 
 can only use this optimization for unencrypted sockets that don't require 
 userspace translation (wrapping).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: 0.8.1 migrator

2015-03-24 Thread syyang
GitHub user syyang opened a pull request:

https://github.com/apache/kafka/pull/52

0.8.1 migrator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uber/kafka 0.8.1-migrator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/52.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #52


commit 405a04d958b1e7a8c997610d761e94c52b782c66
Author: Norbert Hu norb...@uber.com
Date:   2015-02-11T05:32:45Z

Create debian package for 0.8.1

commit 647fac5824d6e1e5b644770505d6e6f8bc9e5d61
Author: Norbert Hu norb...@uber.com
Date:   2015-02-11T05:38:19Z

Use scala 2.8.0

commit a2767a3e12350351bdd45c88e420936c1d2e4ed9
Author: Norbert Hu norb...@uber.com
Date:   2015-02-11T19:03:56Z

Install 0.8.1 debian package as leaf

commit 0c66bb6d60d7f6a9db8f7866149348766a9e9f7c
Author: Norbert Hu norb...@uber.com
Date:   2015-02-15T03:40:51Z

Add jmxtrans-agent to debian package

This makes exporting jmx metrics via the deployed debian package much
easier. See https://github.com/jmxtrans/jmxtrans-agent

NOTE: the inclusion of the jmxtrans-agent jar is mostly for the
migration tool as well as the mirror maker, which doesn't expose an
easy way to export metrics to graphite

commit 505e2d01d96513217c914f34fa0021c64d09d258
Author: Norbert Hu norb...@uber.com
Date:   2015-03-12T00:15:17Z

Kafka migrator tool is buggy with JDK7

See {T63657}

commit b45239268609b761e07d994764165307466cf470
Author: Norbert Hu norb...@uber.com
Date:   2015-03-14T06:42:12Z

Revert Kafka migrator tool is buggy with JDK7

This reverts commit 505e2d01d96513217c914f34fa0021c64d09d258.

commit b96e540a5602adf0b85e83b843d8f850c046bbe5
Author: Norbert Hu norb...@uber.com
Date:   2015-03-18T20:07:21Z

Add arcanist config

commit d2d137d320e4776b3a9dbde4b630a4ebc0d5b331
Author: Norbert Hu norb...@uber.com
Date:   2015-03-18T21:05:04Z

Add arc land branch

commit 998bc66fa3a368b37aa2e31f5e167b4cfe9c62b1
Author: Norbert Hu norb...@uber.com
Date:   2015-03-18T21:12:01Z

Update debian comment

Reviewers: grk

Reviewed By: grk

Differential Revision: https://code.uberinternal.com/D83314

commit b65536b94705b0b890eb510df5610ab0e05df7e9
Author: Seung-Yeoul Yang syy...@uber.com
Date:   2015-03-19T00:57:10Z

[kafka] better error recovery in migrator tool (part 1)

Summary:
* Add a new gradle project for the migrator tool
* Update .gitignore
* KafkaMigrationTool is just copy-pasted.

Test Plan:
  ./gradlew test

Reviewers: grk, praveen, vinoth, csoman, norbert

Reviewed By: norbert

Subscribers: bigpunk, nharkins

Maniphest Tasks: T67641

Differential Revision: https://code.uberinternal.com/D83463

commit ce43cf232fa02769ce774c46aa4b391c5224f471
Author: Seung-Yeoul Yang syy...@uber.com
Date:   2015-03-19T01:12:08Z

[kafka] better error recovery in migration tool (part 2)

Summary:
* check topic whiltelist/blacklist before running the migration tool
* fix corrupted offsets upon detection
* fail fast on erorrs and sigterm

Test plan:
* Added unit tests
* Manually tested that corrupted offsets are fixed

Reviewers: norbert, grk, praveen, csoman, vinoth

Subscribers: nharkins, bigpunk

Maniphest Tasks: T67641

Differential Revision: https://code.uberinternal.com/D84682

commit d6a25ef7e74f5cf74346700e80e05e6a9f6370c4
Author: Seung-Yeoul Yang syy...@uber.com
Date:   2015-03-19T01:12:08Z

[kafka] better error recovery in migration tool (part 2)

Summary:
* check topic whiltelist/blacklist before running the migration tool
* fix corrupted offsets upon detection
* fail fast on erorrs and sigterm

Test plan:
* Added unit tests
* Manually tested that corrupted offsets are fixed

Reviewers: norbert, grk, praveen, csoman, vinoth

Subscribers: nharkins, bigpunk

Maniphest Tasks: T67641

Differential Revision: https://code.uberinternal.com/D84682

commit d257ad6bef663693c3de96562189fdf83a444ed8
Author: Seung-Yeoul Yang syy...@uber.com
Date:   2015-03-24T01:13:32Z

foobar

commit ccb63bd8940f9c02416e4a66925d7d048f1fc3a8
Author: Seung-Yeoul Yang syy...@uber.com
Date:   2015-03-24T01:13:32Z

foobar




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request: 0.8.1 migrator

2015-03-24 Thread syyang
Github user syyang closed the pull request at:

https://github.com/apache/kafka/pull/52


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-24 Thread Don Bosco Durai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378100#comment-14378100
 ] 

Don Bosco Durai commented on KAFKA-1688:


[~prasadm], 

I agree with you, we should support ALL or equivalent keyword like (*) in the 
default implementation. 

I remember the Hierarchical subject brought up during one the google hangout 
discussion around authorization. I don't think there were any resolutions 
around it.

Does it make sense to make this as a custom implementation feature? So for 
OOTB, it would be be just topic name, but anyone who want to implement 
hierarchical privileges can parse the topic name and use . or any other 
supported character has delimiter and provide namespace/database like 
permissions.

FYI, it seems, hierarchical Topics was discussed back in 2012 
https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics


 Add authorization interface and naive implementation
 

 Key: KAFKA-1688
 URL: https://issues.apache.org/jira/browse/KAFKA-1688
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Parth Brahmbhatt
 Fix For: 0.8.3


 Add a PermissionManager interface as described here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security
 (possibly there is a better name?)
 Implement calls to the PermissionsManager in KafkaApis for the main requests 
 (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
 exception to the protocol to indicate permission denied.
 Add a server configuration to give the class you want to instantiate that 
 implements that interface. That class can define its own configuration 
 properties from the main config file.
 Provide a simple implementation of this interface which just takes a user and 
 ip whitelist and permits those in either of the whitelists to do anything, 
 and denies all others.
 Rather than writing an integration test for this class we can probably just 
 use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32440: Patch for KAFKA-2043

2015-03-24 Thread Mayuresh Gharat

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/#review77593
---



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/32440/#comment125718

Since its a Producer level config, is this change needed. We can keep it as 
an instance variable. Also since the compression type does not change, the 
private final makes it more clear. What do you think?


- Mayuresh Gharat


On March 24, 2015, 3:51 p.m., Grant Henke wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32440/
 ---
 
 (Updated March 24, 2015, 3:51 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2043
 https://issues.apache.org/jira/browse/KAFKA-2043
 
 
 Repository: kafka
 
 
 Description
 ---
 
 CompressionType is passed in each RecordAccumulator append
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  24274a64885fadd0e9318de2beb232218ddd52cd 
 
 Diff: https://reviews.apache.org/r/32440/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Grant Henke
 




[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2042:

Priority: Blocker  (was: Major)

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Attachments: KAFKA-2042.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1991) Clean ControllerStats initialization

2015-03-24 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-1991:
--

Assignee: Grant Henke

 Clean ControllerStats initialization
 

 Key: KAFKA-1991
 URL: https://issues.apache.org/jira/browse/KAFKA-1991
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2.0
Reporter: Grant Henke
Assignee: Grant Henke
Priority: Trivial
 Attachments: KAFKA-1991.patch


 This is just a trivial clean up. Values defined by an object are instantiated 
 lazily and are initialized the first time the object is used. 
 This could cause confusion and down the road issues about when/what metrics 
 are initialized in the ControllerStats object. KafkaServer.scala makes a call 
 to each value to initialize it but Scala is not actually behaving that way.
 The change matches the BrokerTopicStats implementation
 example:
 scala object ControllerStats {
  |val uncleanLeaderElectionRate = {
  | println(initializing uncleanLeaderElectionRate)
  | uncleanLeaderElectionRate
  | }
  |val leaderElectionTimer = {
  | println(initializing leaderElectionTimer)
  | leaderElectionTimer
  |}
  | }
 defined object ControllerStats
 scala ControllerStats.uncleanLeaderElectionRate
 initializing uncleanLeaderElectionRate
 initializing leaderElectionTimer
 res7: String = uncleanLeaderElectionRate



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32440: Patch for KAFKA-2043

2015-03-24 Thread Mayuresh Gharat


 On March 24, 2015, 5 p.m., Mayuresh Gharat wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 134
  https://reviews.apache.org/r/32440/diff/1/?file=904417#file904417line134
 
  Since its a Producer level config, is this change needed. We can keep 
  it as an instance variable. Also since the compression type does not 
  change, the private final makes it more clear. What do you think?
 
 Grant Henke wrote:
 I don't mind leaving the instance level config. However, since it is not 
 used anywhere but the constructor I don't see the value in it. If we want to 
 mark it as final we can in the constructor and have the same clarity. The 
 only reason I didn't initially is because the other code did not seam to 
 follow the style of putting final on everything. (Note: I would prefer to put 
 final on everything)

Completely agree that its not used as much. Not having it as instance level 
config is not going to effect the functionality, just that someone would prefer 
the configs to be together.


- Mayuresh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/#review77593
---


On March 24, 2015, 3:51 p.m., Grant Henke wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32440/
 ---
 
 (Updated March 24, 2015, 3:51 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2043
 https://issues.apache.org/jira/browse/KAFKA-2043
 
 
 Repository: kafka
 
 
 Description
 ---
 
 CompressionType is passed in each RecordAccumulator append
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  24274a64885fadd0e9318de2beb232218ddd52cd 
 
 Diff: https://reviews.apache.org/r/32440/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Grant Henke
 




[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-03-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378397#comment-14378397
 ] 

Guozhang Wang commented on KAFKA-1461:
--

Sorry for the delay, I will take a look at 31366 today.

 Replica fetcher thread does not implement any back-off behavior
 ---

 Key: KAFKA-1461
 URL: https://issues.apache.org/jira/browse/KAFKA-1461
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.1.1
Reporter: Sam Meder
Assignee: Sriharsha Chintalapani
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1461.patch, KAFKA-1461.patch, 
 KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch, 
 KAFKA-1461_2015-03-12_13:54:51.patch, KAFKA-1461_2015-03-17_16:03:33.patch


 The current replica fetcher thread will retry in a tight loop if any error 
 occurs during the fetch call. For example, we've seen cases where the fetch 
 continuously throws a connection refused exception leading to several replica 
 fetcher threads that spin in a pretty tight loop.
 To a much lesser degree this is also an issue in the consumer fetcher thread, 
 although the fact that erroring partitions are removed so a leader can be 
 re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-24 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378433#comment-14378433
 ] 

Dmitry Bugaychenko edited comment on KAFKA-2029 at 3/24/15 7:34 PM:


We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding tracked we ensure taht *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info(Escalated controller request:  + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but hasn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.


was (Author: dmitrybugaychenko):
We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding tracked we ensure taht *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info(Escalated controller request:  + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but didn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical

 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in 

[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-24 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378433#comment-14378433
 ] 

Dmitry Bugaychenko commented on KAFKA-2029:
---

We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding tracked we ensure taht *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info(Escalated controller request:  + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but didn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical

 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in controller-to-broker queue). After broker starts back it receives 
 followers request and erases highwatermarks (with a message that replica 
 does not exists - controller hadn't yet sent a request with replica 
 assignment), then controller starts replicas on the broker it deletes all 
 local data (due to missing highwatermarks). Furthermore, controller starts 
 processing pending shutdown request and stops replicas on the broker letting 
 it in a non-functional state. Solution to the problem might be to increase 
 time broker waits for controller reponse to shutdown request, but this 
 timeout is taken from controller.socket.timeout.ms which is global for all 
 broker-controller communication and setting it to 30 minutes is dangerous. 
 *Proposed solution: introduce dedicated config parameter for this timeout 
 with a high default*.
 # If a broker gets down during controlled shutdown and did not come back 
 controller got stuck in a deadlock (one thread owns the lock and tries to add 
 message to the dead broker's queue, send thread is a infinite loop trying to 
 retry message to the dead broker, and the broker failure handler is waiting 
 for a lock). There are numerous partitions without a leader and the only way 
 out is to kill -9 the controller. *Proposed solution: add timeout for adding 
 message to broker's queue*. ControllerChannelManager.sendRequest:
 {code}
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
 (RequestOrResponse) = Unit = null) {
 brokerLock synchronized {
   val stateInfoOpt = brokerStateInfo.get(brokerId)
   stateInfoOpt match {
 case Some(stateInfo) =
  

Re: [VOTE] KIP-15 add a close method with timeout to KafkaProducer

2015-03-24 Thread Jiangjie Qin
Push up the thread for voting after discussion on the KIP hangout.

On 3/19/15, 9:03 PM, Jiangjie Qin j...@linkedin.com wrote:

We had some additional discussions on the discussion thread. Pushing up
this thread to resume voting.

On 3/11/15, 8:47 PM, Jay Kreps jay.kr...@gmail.com wrote:

Yeah guys, I'd like to second that. I'd really really love to get the
quality of these to the point where we could broadly solicit user input
and
use them as a permanent document of the alternatives and rationale.

I know it is a little painful to have process, but I think we all saw
what
happened to the previous clients as public interfaces so I really really
really want us to just be incredibly thoughtful and disciplined as we
make
changes. I think we all want to avoid another client rewrite.

To second Joe's question in a more specific way, I think an alternative I
don't see considered to give close() a bounded time is just to enforce
the
request time on the client side, which will cause all requests to be
failed
after the request timeout expires. This was the same behavior as for
flush.
In the case where the user just wants to ensure close doesn't block
forever
I think that may be sufficient?

So one alternative might be to just do that request timeout feature and
add
a new producer config that is something like
  abort.on.failure=false
which causes the producer to hard exit if it can't send a request. Which
I
think is closer to what you want, with this just being a way to implement
that behavior.

I'm not sure if this is better or worse, but we should be sure before we
make the change.

I also have a concern about
  producer.close(0, TimeUnit.MILLISECONDS)
not meaning close with a timeout of 0 ms.

I realize this exists in other java apis, but it is so confusing it even
confused us into having that recent producer bug because of course all
the
other numbers mean wait that long.

I'd propose
  close()--block until all completed
  close(0, TimeUnit.MILLISECONDS)--block for 0 ms
  close(5, TimeUnit.MILLISECONDS)--block for 5 ms
  close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative
ms
would mean completing in the past :-)

-Jay

On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein joe.st...@stealth.ly wrote:

 Could the KIP confluence please have updated the discussion thread
link,
 thanks... could you also remove the template boilerplate at the top
*This
 page is meant as a template ..* so we can capture it for the release
 cleanly.

 Also I don't really/fully understand how this is different than
 flush(time); close() and why close has its own timeout also?

 Lastly, what is the forceClose flag? This isn't documented in the
public
 interface so it isn't clear how to completely use the feature just by
 reading the KIP.

 ~ Joe Stein
 - - - - - - - - - - - - - - - - -

   http://www.stealth.ly
 - - - - - - - - - - - - - - - - -

 On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang wangg...@gmail.com
 wrote:

  +1 (binding)
 
  On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
  wrote:
 
  
  
 
 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m
e
thod+with+a+timeout+in+the+producer
  
  
 
 
  --
  -- Guozhang
 





Fwd: MirrorMaker improvements

2015-03-24 Thread vlad...@gmail.com
Dear all,

I had a short discussion with Jay yesterday at the ACM meetup and he
suggested writing an email regarding a few possible MirrorMaker
improvements.

At Turn, we have been using MirrorMaker for a a few months now to
asynchronously replicate our key/value store data between our datacenters.
In a way, our system is similar to Linkedin's Databus, but it uses Kafka
clusters and MirrorMaker as its building blocks. Our overall message rate
peaks at about 650K/sec and, when pushing data over high bandwidth delay
product links, we have found some minor bottlenecks.

The MirrorMaker process uses a standard consumer to pull data from a remote
datacenter. This implies that it opens a single TCP connection to each of
the remote brokers and muxes requests for different topics and partitions
over this connection. While this is a good thing in terms of maintaining
the congestion window open, over long RTT lines with rather high loss rate
the congestion window will cap, in our case at just a few Mbps. While the
overall line bandwidth is much higher, this means that we have to start
multiple MirrorMaker processes (somewhere in the hundreds), in order to
completely use the line capacity. Being able to pool multiple TCP
connections from a single consumer to a broker would solve this
complication.

The standard consumer also uses the remote ZooKeeper in order to manage the
consumer group. While consumer group management is moving closer to the
brokers, it might make sense to move the group management to the local
datacenter, since that would avoid using the long-distance connection for
this purpose.

Another possible improvement assumes a further constraint, namely that the
number of partitions for a topic in both datacenters is the same. In my
opinion, this is a sane constraint, since it preserves the Kafka ordering
guarantees (per partition), instead of a simple guarantee per key. This
kind of guarantee can be for example useful in a system that compares
partition contents to reach eventual consistency using Merkle trees. If the
number of partitions is equal, then offsets have the same meaning for the
same partition in both clusters, since the data for both partitions is
identical before the offset. This allows a simple consumer to just inquire
the local broker and the remote broker for their current offsets and, in
case the remote broker is ahead, copy the extra data to the local cluster.
Since the consumer offsets are no longer bound to the specific partitioning
of a single remote cluster, the consumer could pull from one of any number
of remote clusters, BitTorrent-style, if their offsets are ahead of the
local offset. The group management problem would reduce to assigning local
partitions to different MirrorMaker processes, so the group management
could be done locally also in this situation.

Regards,
Vlad

PS: Sorry if this is a double posting! The original posting did not appear
in the archives for a while.


Re: KIP discussion Mar 24 at 11am PST

2015-03-24 Thread Jun Rao
Just to keep everyone posted. The following is a summary of what's being
discussed in the KIP hangout today.

KIP-4 (admin commands):
* Gwen is uploading a patch in KAFKA-1927 (refactoring requests) so that we
can get unblocked of adding new requests.
* We will combine DescribeTopic and TopicMetadata in the future.
* We will leave the admin requests async for now.
* We will not add a VerifyReassignPartitionRequest for now. We can do that
later when we improve the verification process.
* We need to discuss a bit more on how to expose the controller info to the
client.
* Andrii will send out more details on the KIP thread.

KIP-15 (close):
* If close() or close with a non-zero timeout is called from the send
thread, we will log it as an error.
* Jiangjie will follow up on the KIP thread.

KIP-13 (quota):
* Need a separate discussion on whether to use the new metrics package on
the broker on the mailing list.
* There are a few other details being discuss and Aditya will follow up on
the KIP thread.

Thanks,

Jun


On Fri, Mar 20, 2015 at 2:44 PM, Jun Rao j...@confluent.io wrote:

 Hi, Everyone,

 We plan to have a KIP discussion on Google hangout on Mar 24 at 11am PST.
 If you are interested in participating and have not already received a
 calendar invitation, please let me know. The following is the agenda.

 KIP-4 (admin commands): 10 mins
 * status of KAFKA-1927 (refactoring requests). which blocks this KIP
 * status of KAFKA-1634 (improve OffsetCommitRequest), which blocks
 KAFKA-1927
 * any remaining issues for discussion

 KIP-15 (close): 10 mins
 * semantics of close() and close(timeout)

 KIP-13 (quota):
 * protocol change to reflect the state of throttling
 * dependency on using the new metrics package
 * dependency KIP-5 (broker configuration)

 Thanks,

 Jun



[jira] [Comment Edited] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-24 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378433#comment-14378433
 ] 

Dmitry Bugaychenko edited comment on KAFKA-2029 at 3/24/15 7:35 PM:


We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding tracked we ensure that *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info(Escalated controller request:  + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but hasn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.


was (Author: dmitrybugaychenko):
We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding tracked we ensure taht *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info(Escalated controller request:  + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but hasn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical

 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in 

MirrorMaker improvements

2015-03-24 Thread vlad...@gmail.com
Dear all,

I had a short discussion with Jay yesterday at the ACM meetup and he
suggested writing an email regarding a few possible MirrorMaker
improvements.

At Turn, we have been using MirrorMaker for a a few months now to
asynchronously replicate our key/value store data between our datacenters.
In a way, our system is similar to Linkedin's Databus, but it uses Kafka
clusters and MirrorMaker as its building blocks. Our overall message rate
peaks at about 650K/sec and, when pushing data over high bandwidth delay
product links, we have found some minor bottlenecks.

The MirrorMaker process uses a standard consumer to pull data from a remote
datacenter. This implies that it opens a single TCP connection to each of
the remote brokers and muxes requests for different topics and partitions
over this connection. While this is a good thing in terms of maintaining
the congestion window open, over long RTT lines with rather high loss rate
the congestion window will cap, in our case at just a few Mbps. While the
overall line bandwidth is much higher, this means that we have to start
multiple MirrorMaker processes (somewhere in the hundreds), in order to
completely use the line capacity. Being able to pool multiple TCP
connections from a single consumer to a broker would solve this
complication.

The standard consumer also uses the remote ZooKeeper in order to manage the
consumer group. While consumer group management is moving closer to the
brokers, it might make sense to move the group management to the local
datacenter, since that would avoid using the long-distance connection for
this purpose.

Another possible improvement assumes a further constraint, namely that the
number of partitions for a topic in both datacenters is the same. In my
opinion, this is a sane constraint, since it preserves the Kafka ordering
guarantees (per partition), instead of a simple guarantee per key. This
kind of guarantee can be for example useful in a system that compares
partition contents to reach eventual consistency using Merkle trees. If the
number of partitions is equal, then offsets have the same meaning for the
same partition in both clusters, since the data for both partitions is
identical before the offset. This allows a simple consumer to just inquire
the local broker and the remote broker for their current offsets and, in
case the remote broker is ahead, copy the extra data to the local cluster.
Since the consumer offsets are no longer bound to the specific partitioning
of a single remote cluster, the consumer could pull from one of any number
of remote clusters, BitTorrent-style, if their offsets are ahead of the
local offset. The group management problem would reduce to assigning local
partitions to different MirrorMaker processes, so the group management
could be done locally also in this situation.

Regards,
Vlad


[jira] [Created] (KAFKA-2045) Memory Management on the consumer

2015-03-24 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-2045:


 Summary: Memory Management on the consumer
 Key: KAFKA-2045
 URL: https://issues.apache.org/jira/browse/KAFKA-2045
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


We need to add the memory management on the new consumer like we did in the new 
producer. This would probably include:

1. byte buffer re-usage for fetch response partition data.

2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-15 add a close method with timeout to KafkaProducer

2015-03-24 Thread Joel Koshy
Actually, since there are already votes on this and the KIP has
changed a bit we should cancel this and start a new thread.

On Tue, Mar 24, 2015 at 12:19 PM, Jiangjie Qin
j...@linkedin.com.invalid wrote:
 Push up the thread for voting after discussion on the KIP hangout.

 On 3/19/15, 9:03 PM, Jiangjie Qin j...@linkedin.com wrote:

We had some additional discussions on the discussion thread. Pushing up
this thread to resume voting.

On 3/11/15, 8:47 PM, Jay Kreps jay.kr...@gmail.com wrote:

Yeah guys, I'd like to second that. I'd really really love to get the
quality of these to the point where we could broadly solicit user input
and
use them as a permanent document of the alternatives and rationale.

I know it is a little painful to have process, but I think we all saw
what
happened to the previous clients as public interfaces so I really really
really want us to just be incredibly thoughtful and disciplined as we
make
changes. I think we all want to avoid another client rewrite.

To second Joe's question in a more specific way, I think an alternative I
don't see considered to give close() a bounded time is just to enforce
the
request time on the client side, which will cause all requests to be
failed
after the request timeout expires. This was the same behavior as for
flush.
In the case where the user just wants to ensure close doesn't block
forever
I think that may be sufficient?

So one alternative might be to just do that request timeout feature and
add
a new producer config that is something like
  abort.on.failure=false
which causes the producer to hard exit if it can't send a request. Which
I
think is closer to what you want, with this just being a way to implement
that behavior.

I'm not sure if this is better or worse, but we should be sure before we
make the change.

I also have a concern about
  producer.close(0, TimeUnit.MILLISECONDS)
not meaning close with a timeout of 0 ms.

I realize this exists in other java apis, but it is so confusing it even
confused us into having that recent producer bug because of course all
the
other numbers mean wait that long.

I'd propose
  close()--block until all completed
  close(0, TimeUnit.MILLISECONDS)--block for 0 ms
  close(5, TimeUnit.MILLISECONDS)--block for 5 ms
  close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative
ms
would mean completing in the past :-)

-Jay

On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein joe.st...@stealth.ly wrote:

 Could the KIP confluence please have updated the discussion thread
link,
 thanks... could you also remove the template boilerplate at the top
*This
 page is meant as a template ..* so we can capture it for the release
 cleanly.

 Also I don't really/fully understand how this is different than
 flush(time); close() and why close has its own timeout also?

 Lastly, what is the forceClose flag? This isn't documented in the
public
 interface so it isn't clear how to completely use the feature just by
 reading the KIP.

 ~ Joe Stein
 - - - - - - - - - - - - - - - - -

   http://www.stealth.ly
 - - - - - - - - - - - - - - - - -

 On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang wangg...@gmail.com
 wrote:

  +1 (binding)
 
  On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
  wrote:
 
  
  
 

https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m
e
thod+with+a+timeout+in+the+producer
  
  
 
 
  --
  -- Guozhang
 





Re: C++ Client Library -- libkafka-asio

2015-03-24 Thread svante karlsson
@Ewen added license.txt (boost v1.0)

thanks
svante



2015-03-24 2:15 GMT+01:00 Ewen Cheslack-Postava e...@confluent.io:

 You don't get edit permission by default, you need to get one of the admins
 to give you permission.

 @Daniel, I've added libkafka-asio.

 @svante I started to add csi-kafka, but couldn't find a license?


 On Sun, Mar 22, 2015 at 8:29 AM, svante karlsson s...@csi.se wrote:

  Cool, Looks nice. I was looking for something similar a year ago. We also
  ended up rolling our own. https://github.com/bitbouncer/csi-kafka
 
 
  Have you got any performance figures?
 
  /svante
 
  2015-03-22 14:29 GMT+01:00 Daniel Joos dan...@joosweb.de:
 
   Hello there,
  
   I'm currently working on a C++ client library, implementing the Kafka
   protocol using Boost ASIO.
   You can find the source code and some examples on github:
   https://github.com/danieljoos/libkafka-asio
  
   I tried to add it to the Clients section of the Kafka wiki, but
 either
   I'm to blind to see the Edit button, or I just don't have enough
   permissions to edit the page ;-)
   In case you like the library, it would be very nice, if someone with
   sufficient permissions for the wiki could add it there.
  
   Thanks.
   Best regards,
  
   Daniel
  
  
 



 --
 Thanks,
 Ewen



Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/#review77634
---


As discussed offline, I think it is better to make the fix in the app-level 
(KafkaProducer, Metadata) rather than forbidding sending emptry MetadataRequest 
in NetworkClient.

- Guozhang Wang


On March 24, 2015, 8:17 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32434/
 ---
 
 (Updated March 24, 2015, 8:17 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2042
 https://issues.apache.org/jira/browse/KAFKA-2042
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Patch for KAFKA-2042. Do not update metadata for empty topic set in new 
 producer
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 f4295025c28e2842244dc775052b7a3d30fb9d11 
 
 Diff: https://reviews.apache.org/r/32434/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: [VOTE] KIP-15 add a close method with timeout to KafkaProducer

2015-03-24 Thread Guozhang Wang
+1, let's just start a new thread for this.

On Tue, Mar 24, 2015 at 12:23 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Actually, since there are already votes on this and the KIP has
 changed a bit we should cancel this and start a new thread.

 On Tue, Mar 24, 2015 at 12:19 PM, Jiangjie Qin
 j...@linkedin.com.invalid wrote:
  Push up the thread for voting after discussion on the KIP hangout.
 
  On 3/19/15, 9:03 PM, Jiangjie Qin j...@linkedin.com wrote:
 
 We had some additional discussions on the discussion thread. Pushing up
 this thread to resume voting.
 
 On 3/11/15, 8:47 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
 Yeah guys, I'd like to second that. I'd really really love to get the
 quality of these to the point where we could broadly solicit user input
 and
 use them as a permanent document of the alternatives and rationale.
 
 I know it is a little painful to have process, but I think we all saw
 what
 happened to the previous clients as public interfaces so I really really
 really want us to just be incredibly thoughtful and disciplined as we
 make
 changes. I think we all want to avoid another client rewrite.
 
 To second Joe's question in a more specific way, I think an alternative
 I
 don't see considered to give close() a bounded time is just to enforce
 the
 request time on the client side, which will cause all requests to be
 failed
 after the request timeout expires. This was the same behavior as for
 flush.
 In the case where the user just wants to ensure close doesn't block
 forever
 I think that may be sufficient?
 
 So one alternative might be to just do that request timeout feature and
 add
 a new producer config that is something like
   abort.on.failure=false
 which causes the producer to hard exit if it can't send a request. Which
 I
 think is closer to what you want, with this just being a way to
 implement
 that behavior.
 
 I'm not sure if this is better or worse, but we should be sure before we
 make the change.
 
 I also have a concern about
   producer.close(0, TimeUnit.MILLISECONDS)
 not meaning close with a timeout of 0 ms.
 
 I realize this exists in other java apis, but it is so confusing it even
 confused us into having that recent producer bug because of course all
 the
 other numbers mean wait that long.
 
 I'd propose
   close()--block until all completed
   close(0, TimeUnit.MILLISECONDS)--block for 0 ms
   close(5, TimeUnit.MILLISECONDS)--block for 5 ms
   close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative
 ms
 would mean completing in the past :-)
 
 -Jay
 
 On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein joe.st...@stealth.ly
 wrote:
 
  Could the KIP confluence please have updated the discussion thread
 link,
  thanks... could you also remove the template boilerplate at the top
 *This
  page is meant as a template ..* so we can capture it for the release
  cleanly.
 
  Also I don't really/fully understand how this is different than
  flush(time); close() and why close has its own timeout also?
 
  Lastly, what is the forceClose flag? This isn't documented in the
 public
  interface so it isn't clear how to completely use the feature just by
  reading the KIP.
 
  ~ Joe Stein
  - - - - - - - - - - - - - - - - -
 
http://www.stealth.ly
  - - - - - - - - - - - - - - - - -
 
  On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang wangg...@gmail.com
  wrote:
 
   +1 (binding)
  
   On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
  
   
   
  
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m
 e
 thod+with+a+timeout+in+the+producer
   
   
  
  
   --
   -- Guozhang
  
 
 
 




-- 
-- Guozhang


[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2042:

Attachment: KAFKA-2042_2015-03-24_13:37:49.patch

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/
---

(Updated March 24, 2015, 8:37 p.m.)


Review request for kafka.


Bugs: KAFKA-2042
https://issues.apache.org/jira/browse/KAFKA-2042


Repository: kafka


Description (updated)
---

Move the change to KafkaProducer after talking to Guozhang offline.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
feda9c922d7dab17e424f8e6f0aa0a3f968e3729 

Diff: https://reviews.apache.org/r/32434/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378551#comment-14378551
 ] 

Jiangjie Qin commented on KAFKA-2042:
-

Updated reviewboard https://reviews.apache.org/r/32434/diff/
 against branch origin/trunk

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378586#comment-14378586
 ] 

Jiangjie Qin commented on KAFKA-2042:
-

Updated reviewboard https://reviews.apache.org/r/32434/diff/
 against branch origin/trunk

 New producer metadata update always get all topics.
 ---

 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Blocker
 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
 KAFKA-2042_2015-03-24_13:57:23.patch


 The new java producer metadata.topics is initially empty so the producer 
 sends TMR with empty topic set. The broker takes the empty requested topic 
 set as all topics, so metadata.cluster contains all topic metadata. Later on, 
 when a new topic was produced, it gets added into the metadata.topics. The 
 next metadata update will only contain the meta data for this new topic, so 
 the metadata.cluster will only have this topic. Since there are a lot of 
 messages are still in the accumulator but has no metadata in 
 metadata.cluster, if a caller thread do a flush(), the caller thread will 
 block forever because the messages sitting in accumulator without metadata 
 will never be ready to send.
 We should add check for the metadata.topics, if it is empty, no TMR should be 
 sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/
---

(Updated March 24, 2015, 8:57 p.m.)


Review request for kafka.


Bugs: KAFKA-2042
https://issues.apache.org/jira/browse/KAFKA-2042


Repository: kafka


Description (updated)
---

Move the change to KafkaProducer after talking to Guozhang offline.


A less expensive fix


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/Metadata.java 
c8bde8b732defa20819730d87303a9a80d01116f 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
feda9c922d7dab17e424f8e6f0aa0a3f968e3729 

Diff: https://reviews.apache.org/r/32434/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-2045) Memory Management on the consumer

2015-03-24 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378476#comment-14378476
 ] 

Jay Kreps commented on KAFKA-2045:
--

There are really two issues:
1. Bounding fetch size while still guaranteeing that you eventually get data 
from each partition
2. Pooling and reusing byte buffers

I actually think (1) is really pressing, but (2) is just an optimization that 
may or may not have high payoff.

(1) is what leads to the huge memory allocations and sudden OOM when a consumer 
falls behind and then suddenly has lots of data or when partition assignment 
changes.

For (1) I think we need to figure out whether this is (a) some heuristic in the 
consumer which decides to only do fetches for a subset of topic/partitions or 
(b) a new parameter in the fetch request that gives a total bound on the 
request size. I think we discussed this a while back and agreed on (b), but I 
can't remember now. The argument if I recall was that that was the only way for 
the server to monitor all the subscribed topics and avoid blocking on an empty 
topic while non-empty partitions have data.

Bounding the allocations should help performance a lot too.

If we do this bounding then I think reuse will be a lot easier to since each 
response will use at most that many bytes and you could potentially even just 
statically allocate the byte buffer for each partition and reuse it.

 Memory Management on the consumer
 -

 Key: KAFKA-2045
 URL: https://issues.apache.org/jira/browse/KAFKA-2045
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang

 We need to add the memory management on the new consumer like we did in the 
 new producer. This would probably include:
 1. byte buffer re-usage for fetch response partition data.
 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-24 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1927:

Status: Patch Available  (was: Open)

 Replace requests in kafka.api with requests in 
 org.apache.kafka.common.requests
 ---

 Key: KAFKA-1927
 URL: https://issues.apache.org/jira/browse/KAFKA-1927
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1927.patch


 The common package introduced a better way of defining requests using a new 
 protocol definition DSL and also includes wrapper objects for these.
 We should switch KafkaApis over to use these request definitions and consider 
 the scala classes deprecated (we probably need to retain some of them for a 
 while for the scala clients).
 This will be a big improvement because
 1. We will have each request now defined in only one place (Protocol.java)
 2. We will have built-in support for multi-version requests
 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-24 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378236#comment-14378236
 ] 

Gwen Shapira commented on KAFKA-1927:
-

The uploaded patch is:
1. Preliminary and untested, just to show what I had in mind. 
2. Actually belongs in KAFKA-2044...

 Replace requests in kafka.api with requests in 
 org.apache.kafka.common.requests
 ---

 Key: KAFKA-1927
 URL: https://issues.apache.org/jira/browse/KAFKA-1927
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1927.patch


 The common package introduced a better way of defining requests using a new 
 protocol definition DSL and also includes wrapper objects for these.
 We should switch KafkaApis over to use these request definitions and consider 
 the scala classes deprecated (we probably need to retain some of them for a 
 while for the scala clients).
 This will be a big improvement because
 1. We will have each request now defined in only one place (Protocol.java)
 2. We will have built-in support for multi-version requests
 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [Discussion] Using Client Requests and Responses in Server

2015-03-24 Thread Gwen Shapira
Hi,

I uploaded a (very) preliminary patch with my idea.

One thing thats missing:
RequestResponse had  handleError method that all requests implemented,
typically generating appropriate error Response for the request and sending
it along. Its used by KafkaApis to handle all protocol errors for valid
requests that are not handled elsewhere.
AbstractRequestResponse doesn't have such method.

I can, of course, add it.
But before I jump into this, I'm wondering if there was another plan on
handling Api errors.

Gwen

On Mon, Mar 23, 2015 at 6:16 PM, Jun Rao j...@confluent.io wrote:

 I think what you are saying is that in RequestChannel, we can start
 generating header/body for new request types and leave requestObj null. For
 existing requests, header/body will be null initially. Gradually, we can
 migrate each type of requests by populating header/body, instead of
 requestObj. This makes sense to me since it serves two purposes (1) not
 polluting the code base with duplicated request/response objects for new
 types of requests and (2) allowing the refactoring of existing requests to
 be done in smaller pieces.

 Could you try that approach and perhaps just migrate one existing request
 type (e.g. HeartBeatRequest) as an example? We probably need to rewind the
 buffer after reading the requestId when deserializing the header (since the
 header includes the request id).

 Thanks,

 Jun

 On Mon, Mar 23, 2015 at 4:52 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  I'm thinking of a different approach, that will not fix everything, but
  will allow adding new requests without code duplication (and therefore
  unblock KIP-4):
 
  RequestChannel.request currently takes a buffer and parses it into an
 old
  request object. Since the objects are byte-compatibly, we should be able
 to
  parse existing requests into both old and new objects. New requests will
  only be parsed into new objects.
 
  Basically:
  val requestId = buffer.getShort()
  if (requestId in keyToNameAndDeserializerMap) {
 requestObj = RequestKeys.deserializerForKey(requestId)(buffer)
 header: RequestHeader = RequestHeader.parse(buffer)
 body: Struct =
  ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
  } else {
 requestObj = null
  header: RequestHeader = RequestHeader.parse(buffer)
 body: Struct =
  ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
  }
 
  This way existing KafkaApis will keep working as normal. The new Apis can
  implement just the new header/body requests.
  We'll do the same on the send-side: BoundedByteBufferSend can have a
  constructor that takes header/body instead of just a response object.
 
  Does that make sense?
 
  Once we have this in, we can move to:
  * Adding the missing request/response to the client code
  * Replacing requests that can be replaced
 
  It will also make life easier by having us review and tests smaller
 chunks
  of work (the existing patch is *huge* , touches nearly every core
 component
  and I'm not done yet...)
 
  Gwen
 
 
 
 
  On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Ack, yeah, forgot about that.
  
   It's not just a difference of wrappers. The server side actually sends
  the
   bytes lazily using FileChannel.transferTo. We need to make it possible
 to
   carry over that optimization. In some sense what we want to be able to
 do
   is set a value to a Send instead of a ByteBuffer.
  
   Let me try to add that support to the protocol definition stuff, will
   probably take me a few days to free up time.
  
   -Jay
  
   On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
In case anyone is still following this thread, I need a bit of help
 :)
   
The old FetchResponse.PartitionData included a MessageSet object.
The new FetchResponse.PartitionData includes a ByteBuffer.
   
However, when we read from logs, we return a MessageSet, and as far
 as
  I
can see, these can't be converted to ByteBuffers (at least not
 without
copying their data).
   
Did anyone consider how to reconcile the MessageSets with the new
FetchResponse objects?
   
Gwen
   
   
On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira gshap...@cloudera.com
 
wrote:
   
 Note: I'm also treating ZkUtils as if it was a public API (i.e.
converting
 objects that are returned into o.a.k.common equivalents but not
   changing
 ZkUtils itself).
 I know its not public, but I suspect I'm not the only developer
 here
   who
 has tons of external code that uses it.

 Gwen

 On Wed, Mar 18, 2015 at 5:48 PM, Gwen Shapira 
 gshap...@cloudera.com
  
 wrote:

 We can't rip them out completely, unfortunately - the
 SimpleConsumer
uses
 them.

 So we'll need conversion at some point. I'll try to make the
 conversion point just before hitting a public API that we can't
 modify, and hopefully it 

Re: Review Request 32440: Patch for KAFKA-2043

2015-03-24 Thread Grant Henke


 On March 24, 2015, 5 p.m., Mayuresh Gharat wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 134
  https://reviews.apache.org/r/32440/diff/1/?file=904417#file904417line134
 
  Since its a Producer level config, is this change needed. We can keep 
  it as an instance variable. Also since the compression type does not 
  change, the private final makes it more clear. What do you think?

I don't mind leaving the instance level config. However, since it is not used 
anywhere but the constructor I don't see the value in it. If we want to mark it 
as final we can in the constructor and have the same clarity. The only reason I 
didn't initially is because the other code did not seam to follow the style of 
putting final on everything. (Note: I would prefer to put final on everything)


- Grant


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/#review77593
---


On March 24, 2015, 3:51 p.m., Grant Henke wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32440/
 ---
 
 (Updated March 24, 2015, 3:51 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2043
 https://issues.apache.org/jira/browse/KAFKA-2043
 
 
 Repository: kafka
 
 
 Description
 ---
 
 CompressionType is passed in each RecordAccumulator append
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  24274a64885fadd0e9318de2beb232218ddd52cd 
 
 Diff: https://reviews.apache.org/r/32440/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Grant Henke
 




  1   2   >