[jira] [Commented] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis
[ https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378814#comment-14378814 ] Gwen Shapira commented on KAFKA-2044: - Created reviewboard https://reviews.apache.org/r/32459/diff/ against branch trunk Support requests and responses from o.a.k.common in KafkaApis - Key: KAFKA-2044 URL: https://issues.apache.org/jira/browse/KAFKA-2044 Project: Kafka Issue Type: Improvement Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2044.patch As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to support handling of requests and responses from o.a.k.common in KafkaApis. This will allow us to add new Api calls just in o.a.k.conmon and to gradually migrate existing requests and responses. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Jenkins build is back to normal : KafkaPreCommit #42
See https://builds.apache.org/job/KafkaPreCommit/42/changes
KAFKA-2042
Hello, We found a serious bug while testing flush() calls in the new producer, which is summarized in KAFKA-2042. In general, when the producer starts up it will try to refresh metadata with empty topic list, and hence get all the topic metadata. When sending the message with some topic later, it will hence not cause the topic to be added into metadata's topic list since the metadata is available. When the data is still sitting in the accumulator and a new topic is created, that will cause metadata refresh with just this single topic, hence losing the metadata for any other topics. Under usual scenarios the messages will be sitting in the accumulator until another send() is triggered with the same topic, but with flush() as a blocking call the likelihood of this issue being exposed that messages gets blocked forever inside flush() could be largely increased. I am writing to ask if people think this problem is severe enough that requires another bug-fix release. -- Guozhang
Re: [VOTE] KIP-15 add a close with timeout to new producer
+1. On Tue, Mar 24, 2015 at 2:15 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+method+with+a+timeout+in+the+producer As a short summary, the new interface will be as following: Close(Long Timeout, TimeUnit timeUnit) 1. When timeout 0, it will try to wait up to timeout for the sender thread to complete all the requests, then join the sender thread. If the sender thread is not able to finish work before timeout, the method force close the producer by fail all the incomplete requests and join the sender thread. 2. When timeout = 0, it will be a non-blocking call, just initiate the force close and DOES NOT join the sender thread. If close(timeout) is called from callback, an error message will be logged and the producer sender thread will block forever. -- -- Guozhang
Re: [Discussion] Using Client Requests and Responses in Server
OK, I posted a working patch on KAFKA-2044 and https://reviews.apache.org/r/32459/diff/. There are few decisions there than can be up to discussion (factory method on AbstractRequestResponse, the new handleErrors in request API), but as far as support for o.a.k.common requests in core goes, it does what it needs to do. Please review! Gwen On Tue, Mar 24, 2015 at 10:59 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I uploaded a (very) preliminary patch with my idea. One thing thats missing: RequestResponse had handleError method that all requests implemented, typically generating appropriate error Response for the request and sending it along. Its used by KafkaApis to handle all protocol errors for valid requests that are not handled elsewhere. AbstractRequestResponse doesn't have such method. I can, of course, add it. But before I jump into this, I'm wondering if there was another plan on handling Api errors. Gwen On Mon, Mar 23, 2015 at 6:16 PM, Jun Rao j...@confluent.io wrote: I think what you are saying is that in RequestChannel, we can start generating header/body for new request types and leave requestObj null. For existing requests, header/body will be null initially. Gradually, we can migrate each type of requests by populating header/body, instead of requestObj. This makes sense to me since it serves two purposes (1) not polluting the code base with duplicated request/response objects for new types of requests and (2) allowing the refactoring of existing requests to be done in smaller pieces. Could you try that approach and perhaps just migrate one existing request type (e.g. HeartBeatRequest) as an example? We probably need to rewind the buffer after reading the requestId when deserializing the header (since the header includes the request id). Thanks, Jun On Mon, Mar 23, 2015 at 4:52 PM, Gwen Shapira gshap...@cloudera.com wrote: I'm thinking of a different approach, that will not fix everything, but will allow adding new requests without code duplication (and therefore unblock KIP-4): RequestChannel.request currently takes a buffer and parses it into an old request object. Since the objects are byte-compatibly, we should be able to parse existing requests into both old and new objects. New requests will only be parsed into new objects. Basically: val requestId = buffer.getShort() if (requestId in keyToNameAndDeserializerMap) { requestObj = RequestKeys.deserializerForKey(requestId)(buffer) header: RequestHeader = RequestHeader.parse(buffer) body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] } else { requestObj = null header: RequestHeader = RequestHeader.parse(buffer) body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] } This way existing KafkaApis will keep working as normal. The new Apis can implement just the new header/body requests. We'll do the same on the send-side: BoundedByteBufferSend can have a constructor that takes header/body instead of just a response object. Does that make sense? Once we have this in, we can move to: * Adding the missing request/response to the client code * Replacing requests that can be replaced It will also make life easier by having us review and tests smaller chunks of work (the existing patch is *huge* , touches nearly every core component and I'm not done yet...) Gwen On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps jay.kr...@gmail.com wrote: Ack, yeah, forgot about that. It's not just a difference of wrappers. The server side actually sends the bytes lazily using FileChannel.transferTo. We need to make it possible to carry over that optimization. In some sense what we want to be able to do is set a value to a Send instead of a ByteBuffer. Let me try to add that support to the protocol definition stuff, will probably take me a few days to free up time. -Jay On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira gshap...@cloudera.com wrote: In case anyone is still following this thread, I need a bit of help :) The old FetchResponse.PartitionData included a MessageSet object. The new FetchResponse.PartitionData includes a ByteBuffer. However, when we read from logs, we return a MessageSet, and as far as I can see, these can't be converted to ByteBuffers (at least not without copying their data). Did anyone consider how to reconcile the MessageSets with the new FetchResponse objects? Gwen On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira gshap...@cloudera.com wrote: Note: I'm also treating ZkUtils as if it was a public API (i.e. converting objects that are returned into o.a.k.common equivalents but not changing ZkUtils itself). I know its not public, but I suspect I'm not the only
Re: Review Request 32434: Patch for KAFKA-2042
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32434/#review77675 --- Ship it! Ship It! - Guozhang Wang On March 24, 2015, 8:57 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32434/ --- (Updated March 24, 2015, 8:57 p.m.) Review request for kafka. Bugs: KAFKA-2042 https://issues.apache.org/jira/browse/KAFKA-2042 Repository: kafka Description --- Move the change to KafkaProducer after talking to Guozhang offline. A less expensive fix Diffs - clients/src/main/java/org/apache/kafka/clients/Metadata.java c8bde8b732defa20819730d87303a9a80d01116f clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java feda9c922d7dab17e424f8e6f0aa0a3f968e3729 Diff: https://reviews.apache.org/r/32434/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2042: - Fix Version/s: 0.8.3 New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, KAFKA-2042_2015-03-24_13:57:23.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378828#comment-14378828 ] Guozhang Wang commented on KAFKA-2042: -- Thanks for the patch, committed to trunk. New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, KAFKA-2042_2015-03-24_13:57:23.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2035) Add a topic config cache.
[ https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378925#comment-14378925 ] Parth Brahmbhatt commented on KAFKA-2035: - Posted a review https://reviews.apache.org/r/32460/ Add a topic config cache. - Key: KAFKA-2035 URL: https://issues.apache.org/jira/browse/KAFKA-2035 Project: Kafka Issue Type: Task Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Currently the topic config is all about Log configuration so we have a TopicConfigManager which takes in a Log instance and keeps updating that instance's config instance as and when the topic config is updated. The topic config update notifications are sent using zk watchers by Controller. I propose to introduce a TopicConfigCache which will be updated by TopicConfigManager on any config changes. The log instance and any other component (like the authorizer mentioned in KAFKA-1688) will have a reference to TopicConfigCache using which they will access the topic configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.
[ https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-2047: Attachment: KAFKA-2047.patch Accelarate consumer rebalance in mirror maker. -- Key: KAFKA-2047 URL: https://issues.apache.org/jira/browse/KAFKA-2047 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-2047.patch In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became longer because there are more zookeeper consumer connectors doing rebalance serially. Rebalance would be faster if the bootstrap of ZookeeperConsumerConnectors are parallelized. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.
[ https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378935#comment-14378935 ] Jiangjie Qin commented on KAFKA-2047: - Created reviewboard https://reviews.apache.org/r/32465/diff/ against branch origin/trunk Accelarate consumer rebalance in mirror maker. -- Key: KAFKA-2047 URL: https://issues.apache.org/jira/browse/KAFKA-2047 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-2047.patch In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became longer because there are more zookeeper consumer connectors doing rebalance serially. Rebalance would be faster if the bootstrap of ZookeeperConsumerConnectors are parallelized. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.
[ https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-2047: Status: Patch Available (was: Open) Accelarate consumer rebalance in mirror maker. -- Key: KAFKA-2047 URL: https://issues.apache.org/jira/browse/KAFKA-2047 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-2047.patch In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became longer because there are more zookeeper consumer connectors doing rebalance serially. Rebalance would be faster if the bootstrap of ZookeeperConsumerConnectors are parallelized. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 32465: Patch for KAFKA-2047
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32465/ --- Review request for kafka. Bugs: KAFKA-2047 https://issues.apache.org/jira/browse/KAFKA-2047 Repository: kafka Description --- Fix for KAFKA-2047 Accelarate consumer bootstrap consumer rebalance in mirror maker. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala 4f3c4c872e144195bb4b742b802fa3b931edb534 Diff: https://reviews.apache.org/r/32465/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis
[ https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-2044: Status: Patch Available (was: Open) Support requests and responses from o.a.k.common in KafkaApis - Key: KAFKA-2044 URL: https://issues.apache.org/jira/browse/KAFKA-2044 Project: Kafka Issue Type: Improvement Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2044.patch As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to support handling of requests and responses from o.a.k.common in KafkaApis. This will allow us to add new Api calls just in o.a.k.conmon and to gradually migrate existing requests and responses. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 32459: Patch for KAFKA-2044
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32459/ --- Review request for kafka. Bugs: KAFKA-2044 https://issues.apache.org/jira/browse/KAFKA-2044 Repository: kafka Description --- support requests and responses using Common api in core modules (missing files) added error handling and factory method for requests Diffs - clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java 37aff6c0fd2ec2da8551aa74b166ca49b224ddd3 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 6943878116a97c02b758d273d93976019688830e clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 1ebc188742fd65e5e744003b4579324874fd81a9 core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 9a71faae3138af1b4fb48125db619ddc3ad13c5a core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 3651e8603dd0ed0d2ea059786c68cf0722aa094b core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 core/src/main/scala/kafka/api/RequestKeys.scala c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 55ecac285e00abf38d7131368bb46b4c4010dc87 core/src/main/scala/kafka/network/RequestChannel.scala 7b1db3dbbb2c0676f166890f566c14aa248467ab core/src/main/scala/kafka/server/KafkaApis.scala 35af98f0bc1b6a50bd1d97a30147593f8c6a422d core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 Diff: https://reviews.apache.org/r/32459/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis
[ https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-2044: Attachment: KAFKA-2044.patch Support requests and responses from o.a.k.common in KafkaApis - Key: KAFKA-2044 URL: https://issues.apache.org/jira/browse/KAFKA-2044 Project: Kafka Issue Type: Improvement Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2044.patch As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to support handling of requests and responses from o.a.k.common in KafkaApis. This will allow us to add new Api calls just in o.a.k.conmon and to gradually migrate existing requests and responses. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31366: Patch for KAFKA-1461
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31366/#review77674 --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala https://reviews.apache.org/r/31366/#comment125847 Jun has a comment about the case when all partitions gets inactive, which is common when the fetched broker has been just gone through leader migration. We can move the foreach statement before the if statement, and after foreach check if any partitions gets added, if not just backoff for fetchBackoffMs. - Guozhang Wang On March 17, 2015, 11:03 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31366/ --- (Updated March 17, 2015, 11:03 p.m.) Review request for kafka. Bugs: KAFKA-1461 https://issues.apache.org/jira/browse/KAFKA-1461 Repository: kafka Description --- KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. Diffs - core/src/main/scala/kafka/server/AbstractFetcherThread.scala e731df4b2a3e44aa3d761713a09b1070aff81430 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 Diff: https://reviews.apache.org/r/31366/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work
[ https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378842#comment-14378842 ] Joel Koshy commented on KAFKA-2046: --- Hey Clark - Onur and I are looking into this. Delete topic still doesn't work --- Key: KAFKA-2046 URL: https://issues.apache.org/jira/browse/KAFKA-2046 Project: Kafka Issue Type: Bug Reporter: Clark Haskins Assignee: Sriharsha Chintalapani I just attempted to delete at 128 partition topic with all inbound producers stopped. The result was as follows: The /admin/delete_topics znode was empty the topic under /brokers/topics was removed The Kafka topics command showed that the topic was removed However, the data on disk on each broker was not deleted and the topic has not yet been re-created by starting up the inbound mirror maker. Let me know what additional information is needed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2046) Delete topic still doesn't work
[ https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-2046: -- Assignee: Onur Karaman (was: Sriharsha Chintalapani) Delete topic still doesn't work --- Key: KAFKA-2046 URL: https://issues.apache.org/jira/browse/KAFKA-2046 Project: Kafka Issue Type: Bug Reporter: Clark Haskins Assignee: Onur Karaman I just attempted to delete at 128 partition topic with all inbound producers stopped. The result was as follows: The /admin/delete_topics znode was empty the topic under /brokers/topics was removed The Kafka topics command showed that the topic was removed However, the data on disk on each broker was not deleted and the topic has not yet been re-created by starting up the inbound mirror maker. Let me know what additional information is needed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Jenkins build is back to normal : Kafka-trunk #430
See https://builds.apache.org/job/Kafka-trunk/430/changes
[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work
[ https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378741#comment-14378741 ] Sriharsha Chintalapani commented on KAFKA-2046: --- [~clarkhaskins] can you add the details on how the big the cluster was and also do you have state-change.log files on the brokers where the Log data was not deleted. Delete topic still doesn't work --- Key: KAFKA-2046 URL: https://issues.apache.org/jira/browse/KAFKA-2046 Project: Kafka Issue Type: Bug Reporter: Clark Haskins I just attempted to delete at 128 partition topic with all inbound producers stopped. The result was as follows: The /admin/delete_topics znode was empty the topic under /brokers/topics was removed The Kafka topics command showed that the topic was removed However, the data on disk on each broker was not deleted and the topic has not yet been re-created by starting up the inbound mirror maker. Let me know what additional information is needed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2046) Delete topic still doesn't work
[ https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani reassigned KAFKA-2046: - Assignee: Sriharsha Chintalapani Delete topic still doesn't work --- Key: KAFKA-2046 URL: https://issues.apache.org/jira/browse/KAFKA-2046 Project: Kafka Issue Type: Bug Reporter: Clark Haskins Assignee: Sriharsha Chintalapani I just attempted to delete at 128 partition topic with all inbound producers stopped. The result was as follows: The /admin/delete_topics znode was empty the topic under /brokers/topics was removed The Kafka topics command showed that the topic was removed However, the data on disk on each broker was not deleted and the topic has not yet been re-created by starting up the inbound mirror maker. Let me know what additional information is needed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KAFKA-2042
Hi Jun, This issue does not only affect flush(). It is just with flush() the probability is much higher. It will affect the following scenario: 1. Producer started and refreshed metadata. 2. User call producer.send() to send message 1 to topic A, but message A is in accumulator. 3. User call producer.send() to send message 2 to topic B (topic B is a new topic, which does not exist in broker) 4. Message 1 will not get sent out until user try to send message to topic A again. If a flush() is called at this point, it will block forever. Jiangjie (Becket) Qin On 3/24/15, 3:09 PM, Jun Rao j...@confluent.io wrote: Hi, Guozhang, The flush() was added to the new producer in trunk, not in 0.8.2, right? Thanks, Jun On Tue, Mar 24, 2015 at 2:42 PM, Guozhang Wang wangg...@gmail.com wrote: Hello, We found a serious bug while testing flush() calls in the new producer, which is summarized in KAFKA-2042. In general, when the producer starts up it will try to refresh metadata with empty topic list, and hence get all the topic metadata. When sending the message with some topic later, it will hence not cause the topic to be added into metadata's topic list since the metadata is available. When the data is still sitting in the accumulator and a new topic is created, that will cause metadata refresh with just this single topic, hence losing the metadata for any other topics. Under usual scenarios the messages will be sitting in the accumulator until another send() is triggered with the same topic, but with flush() as a blocking call the likelihood of this issue being exposed that messages gets blocked forever inside flush() could be largely increased. I am writing to ask if people think this problem is severe enough that requires another bug-fix release. -- Guozhang
[jira] [Commented] (KAFKA-2032) ConsumerConfig doesn't validate partition.assignment.strategy values
[ https://issues.apache.org/jira/browse/KAFKA-2032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378834#comment-14378834 ] Parth Brahmbhatt commented on KAFKA-2032: - Created reviewboard https://reviews.apache.org/r/32460/diff/ against branch origin/trunk ConsumerConfig doesn't validate partition.assignment.strategy values Key: KAFKA-2032 URL: https://issues.apache.org/jira/browse/KAFKA-2032 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.2 Reporter: Jason Rosenberg Assignee: Parth Brahmbhatt Attachments: KAFKA-2032.patch, KAFKA-2032.patch, KAFKA-2032_2015-03-19_11:42:07.patch, KAFKA-2032_2015-03-19_11:44:48.patch, KAFKA-2032_2015-03-19_11:47:24.patch, KAFKA-2032_2015-03-19_12:19:45.patch In the ConsumerConfig class, there are validation checks to make sure that string based configuration properties conform to allowed values. However, this validation appears to be missing for the partition.assignment.strategy. E.g. there is validation for autooffset.reset and offsets.storage. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 32460: Patch for KAFKA-2032
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32460/ --- Review request for kafka. Bugs: KAFKA-2032 https://issues.apache.org/jira/browse/KAFKA-2032 Repository: kafka Description --- KAFKA:2032 added topic config cache. Diffs - core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 core/src/main/scala/kafka/server/TopicConfigCache.scala PRE-CREATION core/src/main/scala/kafka/server/TopicConfigManager.scala 47295d40131492aaac786273819b7bc6e22e5486 core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 93182aeb342729d420d2e7d59a1035994164b7db core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/32460/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Updated] (KAFKA-2032) ConsumerConfig doesn't validate partition.assignment.strategy values
[ https://issues.apache.org/jira/browse/KAFKA-2032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2032: Attachment: KAFKA-2032.patch ConsumerConfig doesn't validate partition.assignment.strategy values Key: KAFKA-2032 URL: https://issues.apache.org/jira/browse/KAFKA-2032 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.2 Reporter: Jason Rosenberg Assignee: Parth Brahmbhatt Attachments: KAFKA-2032.patch, KAFKA-2032.patch, KAFKA-2032_2015-03-19_11:42:07.patch, KAFKA-2032_2015-03-19_11:44:48.patch, KAFKA-2032_2015-03-19_11:47:24.patch, KAFKA-2032_2015-03-19_12:19:45.patch In the ConsumerConfig class, there are validation checks to make sure that string based configuration properties conform to allowed values. However, this validation appears to be missing for the partition.assignment.strategy. E.g. there is validation for autooffset.reset and offsets.storage. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31366: Patch for KAFKA-1461
On March 24, 2015, 10:46 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/server/AbstractFetcherThread.scala, lines 81-86 https://reviews.apache.org/r/31366/diff/2/?file=898415#file898415line81 Jun has a comment about the case when all partitions gets inactive, which is common when the fetched broker has been just gone through leader migration. We can move the foreach statement before the if statement, and after foreach check if any partitions gets added, if not just backoff for fetchBackoffMs. Thanks for the review. Are you looking at something like this. This wouldn't handle if we have partitionMap populated but all of them are inactive. partitionMap.foreach { case((topicAndPartition, partitionFetchState)) = if(partitionFetchState.isActive) fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset, fetchSize) } if (partitionMap.isEmpty) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) or do we want to check if all the currentParttions are inactive and than backoff? that would be expensive to check if all the partitions or active or not in dowork. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31366/#review77674 --- On March 17, 2015, 11:03 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31366/ --- (Updated March 17, 2015, 11:03 p.m.) Review request for kafka. Bugs: KAFKA-1461 https://issues.apache.org/jira/browse/KAFKA-1461 Repository: kafka Description --- KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. Diffs - core/src/main/scala/kafka/server/AbstractFetcherThread.scala e731df4b2a3e44aa3d761713a09b1070aff81430 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 Diff: https://reviews.apache.org/r/31366/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work
[ https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378783#comment-14378783 ] Neha Narkhede commented on KAFKA-2046: -- [~clarkhaskins] As discussed previously, the minimum amount of information that is needed to troubleshoot any issue, not just delete topic is- 1. Controller logs (possibly at TRACE) 2. Server logs 3 State change log (DEBUG works) Delete topic still doesn't work --- Key: KAFKA-2046 URL: https://issues.apache.org/jira/browse/KAFKA-2046 Project: Kafka Issue Type: Bug Reporter: Clark Haskins Assignee: Sriharsha Chintalapani I just attempted to delete at 128 partition topic with all inbound producers stopped. The result was as follows: The /admin/delete_topics znode was empty the topic under /brokers/topics was removed The Kafka topics command showed that the topic was removed However, the data on disk on each broker was not deleted and the topic has not yet been re-created by starting up the inbound mirror maker. Let me know what additional information is needed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.
Jiangjie Qin created KAFKA-2047: --- Summary: Accelarate consumer rebalance in mirror maker. Key: KAFKA-2047 URL: https://issues.apache.org/jira/browse/KAFKA-2047 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became longer because there are more zookeeper consumer connectors doing rebalance serially. Rebalance would be faster if the bootstrap of ZookeeperConsumerConnectors are parallelized. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KAFKA-2042
Ah that is right, we just need to make sure this ticket goes along with flush() call in the next release then. On Tue, Mar 24, 2015 at 3:09 PM, Jun Rao j...@confluent.io wrote: Hi, Guozhang, The flush() was added to the new producer in trunk, not in 0.8.2, right? Thanks, Jun On Tue, Mar 24, 2015 at 2:42 PM, Guozhang Wang wangg...@gmail.com wrote: Hello, We found a serious bug while testing flush() calls in the new producer, which is summarized in KAFKA-2042. In general, when the producer starts up it will try to refresh metadata with empty topic list, and hence get all the topic metadata. When sending the message with some topic later, it will hence not cause the topic to be added into metadata's topic list since the metadata is available. When the data is still sitting in the accumulator and a new topic is created, that will cause metadata refresh with just this single topic, hence losing the metadata for any other topics. Under usual scenarios the messages will be sitting in the accumulator until another send() is triggered with the same topic, but with flush() as a blocking call the likelihood of this issue being exposed that messages gets blocked forever inside flush() could be largely increased. I am writing to ask if people think this problem is severe enough that requires another bug-fix release. -- Guozhang -- -- Guozhang
[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378954#comment-14378954 ] Jun Rao commented on KAFKA-2042: Could you explain a bit more when the new producer will send a TMR with an empty topic list? I can see this happen if after the producer is created, no message is sent within the window of metadata age. Is that the only case when this can happen? New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, KAFKA-2042_2015-03-24_13:57:23.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[CANCEL] [VOTE] KIP-15 add a close method with timeout to KafkaProducer
On 3/24/15, 1:34 PM, Guozhang Wang wangg...@gmail.com wrote: +1, let's just start a new thread for this. On Tue, Mar 24, 2015 at 12:23 PM, Joel Koshy jjkosh...@gmail.com wrote: Actually, since there are already votes on this and the KIP has changed a bit we should cancel this and start a new thread. On Tue, Mar 24, 2015 at 12:19 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Push up the thread for voting after discussion on the KIP hangout. On 3/19/15, 9:03 PM, Jiangjie Qin j...@linkedin.com wrote: We had some additional discussions on the discussion thread. Pushing up this thread to resume voting. On 3/11/15, 8:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah guys, I'd like to second that. I'd really really love to get the quality of these to the point where we could broadly solicit user input and use them as a permanent document of the alternatives and rationale. I know it is a little painful to have process, but I think we all saw what happened to the previous clients as public interfaces so I really really really want us to just be incredibly thoughtful and disciplined as we make changes. I think we all want to avoid another client rewrite. To second Joe's question in a more specific way, I think an alternative I don't see considered to give close() a bounded time is just to enforce the request time on the client side, which will cause all requests to be failed after the request timeout expires. This was the same behavior as for flush. In the case where the user just wants to ensure close doesn't block forever I think that may be sufficient? So one alternative might be to just do that request timeout feature and add a new producer config that is something like abort.on.failure=false which causes the producer to hard exit if it can't send a request. Which I think is closer to what you want, with this just being a way to implement that behavior. I'm not sure if this is better or worse, but we should be sure before we make the change. I also have a concern about producer.close(0, TimeUnit.MILLISECONDS) not meaning close with a timeout of 0 ms. I realize this exists in other java apis, but it is so confusing it even confused us into having that recent producer bug because of course all the other numbers mean wait that long. I'd propose close()--block until all completed close(0, TimeUnit.MILLISECONDS)--block for 0 ms close(5, TimeUnit.MILLISECONDS)--block for 5 ms close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative ms would mean completing in the past :-) -Jay On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein joe.st...@stealth.ly wrote: Could the KIP confluence please have updated the discussion thread link, thanks... could you also remove the template boilerplate at the top *This page is meant as a template ..* so we can capture it for the release cleanly. Also I don't really/fully understand how this is different than flush(time); close() and why close has its own timeout also? Lastly, what is the forceClose flag? This isn't documented in the public interface so it isn't clear how to completely use the feature just by reading the KIP. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang wangg...@gmail.com wrote: +1 (binding) On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m e thod+with+a+timeout+in+the+producer -- -- Guozhang -- -- Guozhang
[jira] [Created] (KAFKA-2046) Delete topic still doesn't work
Clark Haskins created KAFKA-2046: Summary: Delete topic still doesn't work Key: KAFKA-2046 URL: https://issues.apache.org/jira/browse/KAFKA-2046 Project: Kafka Issue Type: Bug Reporter: Clark Haskins I just attempted to delete at 128 partition topic with all inbound producers stopped. The result was as follows: The /admin/delete_topics znode was empty the topic under /brokers/topics was removed The Kafka topics command showed that the topic was removed However, the data on disk on each broker was not deleted and the topic has not yet been re-created by starting up the inbound mirror maker. Let me know what additional information is needed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378734#comment-14378734 ] Pete Wright commented on KAFKA-1982: +1 on committing this as it will quicken adoption of the new producer across our organization while also allowing us to track upstream releases with out patches. change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KAFKA-2042
Hi, Guozhang, The flush() was added to the new producer in trunk, not in 0.8.2, right? Thanks, Jun On Tue, Mar 24, 2015 at 2:42 PM, Guozhang Wang wangg...@gmail.com wrote: Hello, We found a serious bug while testing flush() calls in the new producer, which is summarized in KAFKA-2042. In general, when the producer starts up it will try to refresh metadata with empty topic list, and hence get all the topic metadata. When sending the message with some topic later, it will hence not cause the topic to be added into metadata's topic list since the metadata is available. When the data is still sitting in the accumulator and a new topic is created, that will cause metadata refresh with just this single topic, hence losing the metadata for any other topics. Under usual scenarios the messages will be sitting in the accumulator until another send() is triggered with the same topic, but with flush() as a blocking call the likelihood of this issue being exposed that messages gets blocked forever inside flush() could be largely increased. I am writing to ask if people think this problem is severe enough that requires another bug-fix release. -- Guozhang
[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work
[ https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378944#comment-14378944 ] Onur Karaman commented on KAFKA-2046: - I just tried deleting a topic with 128 partitions and noticed that the delete topic node and the topic node were still in zk, and all but one replica on the brokers had not been deleted. grep handling stop replica (delete=false) kafka-state-change.log produced output for all of the partitions. So the controller was able to send a StopReplicaRequest to the brokers to transition from OnlineReplica to OfflineReplica. However, grep handling stop replica (delete=true) kafka-state-change.log only revealed only one replica. This was the replica that I noticed had actually been deleted from the filesystem. The other replicas never received the delete=true StopReplicaRequest. So the transition from OfflineReplica to ReplicaDeletionStarted for all the other replicas hang. A thread dump on the controller indicates that it's getting stuck because of a LinkedBlockingQueue in ControllerChannelManager: {code} delete-topics-thread-xyz... java.lang.Thread.State: WAITING (parking) ... at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) ... at kafka.controller.KafkaController.sendRequest(KafkaController.scala:670) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$10.apply(ControllerChannelManager.scala:320) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$10.apply(ControllerChannelManager.scala:317) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:317) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:310) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:310) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:115) at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:337) at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:327) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:327) {code} controller.message.queue.size property is currently set to be very small. I'll try bumping this up and see if this addresses the issue. Delete topic still doesn't work --- Key: KAFKA-2046 URL: https://issues.apache.org/jira/browse/KAFKA-2046 Project: Kafka Issue Type: Bug Reporter: Clark Haskins Assignee: Onur Karaman I just attempted to delete at 128 partition topic with all inbound producers stopped. The result was as follows: The /admin/delete_topics znode was empty the topic under /brokers/topics was removed The Kafka topics command showed that the topic was removed However, the data on disk on each broker was not deleted and the topic has not yet been re-created by starting up the inbound mirror maker. Let me know what additional information is needed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append
Grant Henke created KAFKA-2043: -- Summary: CompressionType is passed in each RecordAccumulator append Key: KAFKA-2043 URL: https://issues.apache.org/jira/browse/KAFKA-2043 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.0 Reporter: Grant Henke Assignee: Grant Henke Priority: Minor Currently org.apache.kafka.clients.producer.internals.RecordAccumulator append method accepts the compressionType on a per record basis. It looks like the code would only work on a per batch basis because the CompressionType is only used when creating a new RecordBatch. My understanding is this should only support setting per batch at most. public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException; The compression type is a producer level config. Instead of passing it in for each append, we probably should just pass it in once during the creation RecordAccumulator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 32440: Patch for KAFKA-2043
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32440/ --- Review request for kafka. Bugs: KAFKA-2043 https://issues.apache.org/jira/browse/KAFKA-2043 Repository: kafka Description --- CompressionType is passed in each RecordAccumulator append Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java feda9c922d7dab17e424f8e6f0aa0a3f968e3729 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 24274a64885fadd0e9318de2beb232218ddd52cd Diff: https://reviews.apache.org/r/32440/diff/ Testing --- Thanks, Grant Henke
[jira] [Commented] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append
[ https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378046#comment-14378046 ] Grant Henke commented on KAFKA-2043: Created reviewboard https://reviews.apache.org/r/32440/diff/ against branch origin/trunk CompressionType is passed in each RecordAccumulator append -- Key: KAFKA-2043 URL: https://issues.apache.org/jira/browse/KAFKA-2043 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.0 Reporter: Grant Henke Assignee: Grant Henke Priority: Minor Attachments: KAFKA-2043.patch Currently org.apache.kafka.clients.producer.internals.RecordAccumulator append method accepts the compressionType on a per record basis. It looks like the code would only work on a per batch basis because the CompressionType is only used when creating a new RecordBatch. My understanding is this should only support setting per batch at most. public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException; The compression type is a producer level config. Instead of passing it in for each append, we probably should just pass it in once during the creation RecordAccumulator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append
[ https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grant Henke updated KAFKA-2043: --- Status: Patch Available (was: Open) CompressionType is passed in each RecordAccumulator append -- Key: KAFKA-2043 URL: https://issues.apache.org/jira/browse/KAFKA-2043 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.0 Reporter: Grant Henke Assignee: Grant Henke Priority: Minor Attachments: KAFKA-2043.patch Currently org.apache.kafka.clients.producer.internals.RecordAccumulator append method accepts the compressionType on a per record basis. It looks like the code would only work on a per batch basis because the CompressionType is only used when creating a new RecordBatch. My understanding is this should only support setting per batch at most. public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException; The compression type is a producer level config. Instead of passing it in for each append, we probably should just pass it in once during the creation RecordAccumulator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append
[ https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grant Henke updated KAFKA-2043: --- Attachment: KAFKA-2043.patch CompressionType is passed in each RecordAccumulator append -- Key: KAFKA-2043 URL: https://issues.apache.org/jira/browse/KAFKA-2043 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.0 Reporter: Grant Henke Assignee: Grant Henke Priority: Minor Attachments: KAFKA-2043.patch Currently org.apache.kafka.clients.producer.internals.RecordAccumulator append method accepts the compressionType on a per record basis. It looks like the code would only work on a per batch basis because the CompressionType is only used when creating a new RecordBatch. My understanding is this should only support setting per batch at most. public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException; The compression type is a producer level config. Instead of passing it in for each append, we probably should just pass it in once during the creation RecordAccumulator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: New Producer Questions/Feedback
Here is the jira: https://issues.apache.org/jira/browse/KAFKA-2043 Thanks, Grant On Mon, Mar 23, 2015 at 11:53 PM, Jun Rao j...@confluent.io wrote: RecordAccumulator is actually not part of the public api since it's internal. The public apis are only those in http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html Thanks, Jun On Mon, Mar 23, 2015 at 9:23 PM, Grant Henke ghe...@cloudera.com wrote: Thanks for validating that. I was thinking of solving it in the same fashion. Though I was unsure if there was/would be a use case to have multiple CompressionTypes in the same RecordAccumulator since the API was originally created this way. I would be happy to file a jira and can take on making the change too. Since RecordAccumulator is part of the public api, should the KIP process be followed here as well? On Mon, Mar 23, 2015 at 10:58 PM, Jun Rao j...@confluent.io wrote: Hi, Grant, The append api seems indeed a bit weird. The compression type is a producer level config. Instead of passing it in for each append, we probably should just pass it in once during the creation RecordAccumulator. Could you file a jira to track this? Thanks, Jun On Mon, Mar 23, 2015 at 7:16 PM, Grant Henke ghe...@cloudera.com wrote: I am reading over the new producer code in an effort to understand the implementation more thoroughly and had some questions/feedback. Currently org.apache.kafka.clients.producer.internals.RecordAccumulator append method accepts the compressionType on a per record basis. It looks like the code would only work on a per batch basis because the CompressionType is only used when creating a new RecordBatch. My understanding is this should only support setting per batch at most. I may have misread this though. Is there a time where setting per record would make sense? public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException; Why does org.apache.kafka.common.serialization.Serializer Interface require a topic? Is there a use case where serialization would change based on topic? public byte[] serialize(String topic, T data); Thank you, Grant -- Grant Henke Solutions Consultant | Cloudera ghe...@cloudera.com | 920-980-8979 twitter.com/ghenke http://twitter.com/gchenke | linkedin.com/in/granthenke -- Grant Henke Solutions Consultant | Cloudera ghe...@cloudera.com | 920-980-8979 twitter.com/ghenke http://twitter.com/gchenke | linkedin.com/in/granthenke -- Grant Henke Solutions Consultant | Cloudera ghe...@cloudera.com | 920-980-8979 twitter.com/ghenke http://twitter.com/gchenke | linkedin.com/in/granthenke
[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378064#comment-14378064 ] Parth Brahmbhatt commented on KAFKA-1688: - [~prasadm] Thanks for taking the time to review. * Personally, I don't like super user concept primarily because even though it provides convenience it also increases the blast radius of the entire system. If a user's credentials are compromised in the current design only topics and actions he can perform on a cluster are compromised. However I think its fair to provide this feature so the users can make that choice on their own. I will update the KIP to reflect this as part of the proposal and let others vote for it. The user's won't have to grant permissions on non existing topics in absence of the super user concept. However they will have to supply permissions during topic creation and they will be allowed to alter these ACLs via alter topic command line tool. * I can add ALL as an item to the Operation enum. For now, like most other permissions it will only be applicable to topics. Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Parth Brahmbhatt Fix For: 0.8.3 Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TAO XIAO updated KAFKA-2048: Attachment: KAFKA-2048.patch java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer --- Key: KAFKA-2048 URL: https://issues.apache.org/jira/browse/KAFKA-2048 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Reporter: TAO XIAO Assignee: Neha Narkhede Attachments: KAFKA-2048.patch AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the catch block of processFetchRequest method. This is because partitionMapLock is not acquired before calling partitionMapCond.await() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2049) Add thread that detects JVM pauses
Gwen Shapira created KAFKA-2049: --- Summary: Add thread that detects JVM pauses Key: KAFKA-2049 URL: https://issues.apache.org/jira/browse/KAFKA-2049 Project: Kafka Issue Type: Improvement Reporter: Gwen Shapira Assignee: Gwen Shapira Long JVM pauses can cause Kafka malfunctions (especially when interacting with ZK) that can be challenging to debug. I propose implementing HADOOP-9618 in Kafka: Add a simple thread which loops on 1-second sleeps, and if the sleep ever takes significantly longer than 1 second, log a WARN. This will make GC pauses (and other pauses) obvious in logs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TAO XIAO updated KAFKA-2048: Status: Patch Available (was: Open) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer --- Key: KAFKA-2048 URL: https://issues.apache.org/jira/browse/KAFKA-2048 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Reporter: TAO XIAO Assignee: Neha Narkhede Attachments: KAFKA-2048.patch AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the catch block of processFetchRequest method. This is because partitionMapLock is not acquired before calling partitionMapCond.await() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TAO XIAO updated KAFKA-2048: Attachment: KAFKA-2048.patch java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer --- Key: KAFKA-2048 URL: https://issues.apache.org/jira/browse/KAFKA-2048 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Reporter: TAO XIAO Assignee: Neha Narkhede Attachments: KAFKA-2048.patch AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the catch block of processFetchRequest method. This is because partitionMapLock is not acquired before calling partitionMapCond.await() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1293) Mirror maker housecleaning
[ https://issues.apache.org/jira/browse/KAFKA-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379033#comment-14379033 ] Jiangjie Qin commented on KAFKA-1293: - [~mwarhaftig] It looks very closely related to the work in KIP-14. Please feel free to own that KIP if you want to. https://cwiki.apache.org/confluence/display/KAFKA/KIP-14+-+Tools+Standardization Since this is a public interface change, we need to go through the KIP process. You can find the KIP process here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals Mirror maker housecleaning -- Key: KAFKA-1293 URL: https://issues.apache.org/jira/browse/KAFKA-1293 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1 Reporter: Jay Kreps Priority: Minor Labels: usability Attachments: KAFKA-1293.patch Mirror maker uses it's own convention for command-line arguments, e.g. --num.producers, where everywhere else follows the unix convention like --num-producers. This is annoying because when running different tools you have to constantly remember whatever quirks of the person who wrote that tool. Mirror maker should also have a top-level wrapper script in bin/ to make tab completion work and so you don't have to remember the fully qualified class name. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TAO XIAO updated KAFKA-2048: Status: Patch Available (was: Open) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer --- Key: KAFKA-2048 URL: https://issues.apache.org/jira/browse/KAFKA-2048 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Reporter: TAO XIAO Assignee: Neha Narkhede Attachments: KAFKA-2048.patch AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the catch block of processFetchRequest method. This is because partitionMapLock is not acquired before calling partitionMapCond.await() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379175#comment-14379175 ] Jiangjie Qin commented on KAFKA-2029: - I see. Yes, there is value to put per event barriers on the controller to achieve better synchronization among brokers. It looks orthogonal to the the efforts on broker side. Can you maybe upload the patch using kafka-patch-review.py so we have a better view on the changes? Thanks. Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). *Proposed solution: use bounded wait in rebalance thread*. KafkaController.scala: {code} // ODKL Patch to prevent deadlocks in shutdown. /** * Execute the given function inside the lock */ def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T =
[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TAO XIAO updated KAFKA-2048: Attachment: (was: KAFKA-2048.patch) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer --- Key: KAFKA-2048 URL: https://issues.apache.org/jira/browse/KAFKA-2048 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Reporter: TAO XIAO Assignee: Neha Narkhede Attachments: KAFKA-2048.patch AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the catch block of processFetchRequest method. This is because partitionMapLock is not acquired before calling partitionMapCond.await() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2048: - Resolution: Fixed Status: Resolved (was: Patch Available) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer --- Key: KAFKA-2048 URL: https://issues.apache.org/jira/browse/KAFKA-2048 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Reporter: TAO XIAO Assignee: Neha Narkhede Attachments: KAFKA-2048.patch AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the catch block of processFetchRequest method. This is because partitionMapLock is not acquired before calling partitionMapCond.await() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379250#comment-14379250 ] Guozhang Wang commented on KAFKA-2048: -- Thanks for the patch [~xiaotao183], +1 and committed to trunk. java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer --- Key: KAFKA-2048 URL: https://issues.apache.org/jira/browse/KAFKA-2048 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Reporter: TAO XIAO Assignee: Neha Narkhede Attachments: KAFKA-2048.patch AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the catch block of processFetchRequest method. This is because partitionMapLock is not acquired before calling partitionMapCond.await() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2042: - Resolution: Fixed Status: Resolved (was: Patch Available) New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, KAFKA-2042_2015-03-24_13:57:23.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378986#comment-14378986 ] Jiangjie Qin commented on KAFKA-2042: - Yes, it depends on whether the topic list is empty or not when we send the first TMR. I might miss something but I think the TMR will be sent very soon after the producer is instantiated. In the first NetworkClient.poll(), it checks if metadata needs update by getting the max of timeToNextMeatadataUpdate timeToNextReconnectAttempt waitForMetadataFetch All of them will be 0 on starting up. That means the TMR will be sent at the first poll(). New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, KAFKA-2042_2015-03-24_13:57:23.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior
[ https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379162#comment-14379162 ] Sriharsha Chintalapani commented on KAFKA-1461: --- [~guozhang] Thanks for the review. Can you please take a look at my reply to your comment. Replica fetcher thread does not implement any back-off behavior --- Key: KAFKA-1461 URL: https://issues.apache.org/jira/browse/KAFKA-1461 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.1.1 Reporter: Sam Meder Assignee: Sriharsha Chintalapani Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1461.patch, KAFKA-1461.patch, KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch, KAFKA-1461_2015-03-12_13:54:51.patch, KAFKA-1461_2015-03-17_16:03:33.patch The current replica fetcher thread will retry in a tight loop if any error occurs during the fetch call. For example, we've seen cases where the fetch continuously throws a connection refused exception leading to several replica fetcher threads that spin in a pretty tight loop. To a much lesser degree this is also an issue in the consumer fetcher thread, although the fact that erroring partitions are removed so a leader can be re-discovered helps some. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[DISCUSS] KIP-17 - Add HighwaterMarkOffset to OffsetFetchResponse
Here is an initial proposal to add HighwaterMarkOffset to the OffsetFetchResponse: https://cwiki.apache.org/confluence/display/KAFKA/KIP-17+-+Add+HighwaterMarkOffset+to+OffsetFetchResponse I can add a jira and more implementation details if the initial proposal has interest. Thanks, Grant -- Grant Henke Solutions Consultant | Cloudera ghe...@cloudera.com | 920-980-8979 twitter.com/ghenke http://twitter.com/gchenke | linkedin.com/in/granthenke
[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379015#comment-14379015 ] Jun Rao commented on KAFKA-2042: If I start a console-producer w/o typing in any message, the producer actually doesn't send any metadata request immediately. On initializing the producer, we update metadata with the bootstrap broker. This sets lastRefreshMs to the current time. So, in NetworkClient.poll(), timeToNextMeatadataUpdate will actually be the metadata age, which defaults to 300 secs. In what situation did you discover this problem? New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, KAFKA-2042_2015-03-24_13:57:23.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379120#comment-14379120 ] Jiangjie Qin commented on KAFKA-2042: - Ah, yes, that's right. I found this issue when starting mirror maker. Because a large mirror maker cluster might take some time to finish consumer rebalance. So that's why no producer.send() was called before the first TMR was sent. So this issue probably will not cause issue in normal cases under default settings. New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, KAFKA-2042_2015-03-24_13:57:23.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TAO XIAO updated KAFKA-2048: Status: Open (was: Patch Available) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer --- Key: KAFKA-2048 URL: https://issues.apache.org/jira/browse/KAFKA-2048 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Reporter: TAO XIAO Assignee: Neha Narkhede AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the catch block of processFetchRequest method. This is because partitionMapLock is not acquired before calling partitionMapCond.await() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TAO XIAO updated KAFKA-2048: Attachment: (was: KAFKA-2048.patch) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer --- Key: KAFKA-2048 URL: https://issues.apache.org/jira/browse/KAFKA-2048 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Reporter: TAO XIAO Assignee: Neha Narkhede AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the catch block of processFetchRequest method. This is because partitionMapLock is not acquired before calling partitionMapCond.await() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: KafkaPreCommit #43
See https://builds.apache.org/job/KafkaPreCommit/43/changes Changes: [wangguoz] KAFKA-2048; Change lock synchronized to inLock() for partitionMapCond; reviewed by Guozhang Wang -- [...truncated 1194 lines...] kafka.admin.DeleteTopicTest testDeleteNonExistingTopic PASSED kafka.admin.DeleteTopicTest testDeleteTopicWithCleaner PASSED kafka.api.ProducerSendTest testSendOffset PASSED kafka.api.ProducerSendTest testSerializer PASSED kafka.api.ProducerSendTest testClose PASSED kafka.api.ProducerSendTest testSendToPartition PASSED kafka.api.ProducerSendTest testAutoCreateTopic PASSED kafka.api.ProducerSendTest testFlush PASSED kafka.api.ProducerFailureHandlingTest testNotEnoughReplicas PASSED kafka.api.ProducerFailureHandlingTest testInvalidPartition PASSED kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckZero PASSED kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne PASSED kafka.api.ProducerFailureHandlingTest testNonExistentTopic PASSED kafka.api.ProducerFailureHandlingTest testWrongBrokerList PASSED kafka.api.ProducerFailureHandlingTest testNoResponse PASSED kafka.api.ProducerFailureHandlingTest testSendAfterClosed PASSED kafka.api.ProducerFailureHandlingTest testBrokerFailure PASSED kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED kafka.api.ProducerFailureHandlingTest testNotEnoughReplicasAfterBrokerShutdown PASSED kafka.api.ConsumerTest testSimpleConsumption PASSED kafka.api.ConsumerTest testAutoOffsetReset PASSED kafka.api.ConsumerTest testSeek PASSED kafka.api.ConsumerTest testGroupConsumption PASSED kafka.api.ConsumerTest testPositionAndCommit PASSED kafka.api.ConsumerTest testPartitionsFor PASSED kafka.api.ConsumerTest testConsumptionWithBrokerFailures PASSED kafka.api.ConsumerTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:492 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerTest.seekAndCommitWithBrokerFailures(ConsumerTest.scala:201) at kafka.api.ConsumerTest.testSeekAndCommitWithBrokerFailures(ConsumerTest.scala:182) kafka.api.ConsumerTest testPartitionReassignmentCallback PASSED kafka.api.RequestResponseSerializationTest testSerializationAndDeserialization PASSED kafka.api.ApiUtilsTest testShortStringNonASCII PASSED kafka.api.ApiUtilsTest testShortStringASCII PASSED kafka.api.test.ProducerCompressionTest testCompression[0] PASSED kafka.api.test.ProducerCompressionTest testCompression[1] PASSED kafka.api.test.ProducerCompressionTest testCompression[2] PASSED kafka.api.test.ProducerCompressionTest testCompression[3] PASSED kafka.javaapi.consumer.ZookeeperConsumerConnectorTest testBasic PASSED kafka.javaapi.message.ByteBufferMessageSetTest testWrittenEqualsRead PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistent PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytes PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistentWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytesWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEquals PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEqualsWithCompression PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabled PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionDisabled PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabledByTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testCleanLeaderElectionDisabledByTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionInvalidTopicOverride PASSED kafka.integration.FetcherTest testFetcher PASSED kafka.integration.RollingBounceTest testRollingBounce PASSED kafka.integration.PrimitiveApiTest testFetchRequestCanProperlySerialize PASSED kafka.integration.PrimitiveApiTest testEmptyFetchRequest PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetch PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetchWithCompression PASSED kafka.integration.PrimitiveApiTest testProduceAndMultiFetch PASSED kafka.integration.PrimitiveApiTest testMultiProduce PASSED kafka.integration.PrimitiveApiTest testConsumerEmptyTopic PASSED kafka.integration.PrimitiveApiTest testPipelinedProduceRequests PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooHigh PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooLow PASSED kafka.integration.AutoOffsetResetTest
Re: MirrorMaker improvements
Thanks for sharing this Vlad, this is great read! I am particularly interested about the last bullet point of one-to-one mapping in MM since you also mentioned that you use Kafka MM as the async replication layer for your geo-replicated k-v store. One approach that we are pursuing here to support active-active is to use a aggregate cluster that mirror from multiple local clusters from different data centers. But this approach disables the one-to-one mapping since it requires multiple sources to pipe to a single destination. How did you tackle this problem at Turn if you are also using active-active? Guozhang On Tue, Mar 24, 2015 at 12:18 PM, vlad...@gmail.com vlad...@gmail.com wrote: Dear all, I had a short discussion with Jay yesterday at the ACM meetup and he suggested writing an email regarding a few possible MirrorMaker improvements. At Turn, we have been using MirrorMaker for a a few months now to asynchronously replicate our key/value store data between our datacenters. In a way, our system is similar to Linkedin's Databus, but it uses Kafka clusters and MirrorMaker as its building blocks. Our overall message rate peaks at about 650K/sec and, when pushing data over high bandwidth delay product links, we have found some minor bottlenecks. The MirrorMaker process uses a standard consumer to pull data from a remote datacenter. This implies that it opens a single TCP connection to each of the remote brokers and muxes requests for different topics and partitions over this connection. While this is a good thing in terms of maintaining the congestion window open, over long RTT lines with rather high loss rate the congestion window will cap, in our case at just a few Mbps. While the overall line bandwidth is much higher, this means that we have to start multiple MirrorMaker processes (somewhere in the hundreds), in order to completely use the line capacity. Being able to pool multiple TCP connections from a single consumer to a broker would solve this complication. The standard consumer also uses the remote ZooKeeper in order to manage the consumer group. While consumer group management is moving closer to the brokers, it might make sense to move the group management to the local datacenter, since that would avoid using the long-distance connection for this purpose. Another possible improvement assumes a further constraint, namely that the number of partitions for a topic in both datacenters is the same. In my opinion, this is a sane constraint, since it preserves the Kafka ordering guarantees (per partition), instead of a simple guarantee per key. This kind of guarantee can be for example useful in a system that compares partition contents to reach eventual consistency using Merkle trees. If the number of partitions is equal, then offsets have the same meaning for the same partition in both clusters, since the data for both partitions is identical before the offset. This allows a simple consumer to just inquire the local broker and the remote broker for their current offsets and, in case the remote broker is ahead, copy the extra data to the local cluster. Since the consumer offsets are no longer bound to the specific partitioning of a single remote cluster, the consumer could pull from one of any number of remote clusters, BitTorrent-style, if their offsets are ahead of the local offset. The group management problem would reduce to assigning local partitions to different MirrorMaker processes, so the group management could be done locally also in this situation. Regards, Vlad PS: Sorry if this is a double posting! The original posting did not appear in the archives for a while. -- -- Guozhang
[jira] [Commented] (KAFKA-1856) Add PreCommit Patch Testing
[ https://issues.apache.org/jira/browse/KAFKA-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14379331#comment-14379331 ] Joe Stein commented on KAFKA-1856: -- Testing file [KAFKA-1856_2015-02-04_15%3A44%3A47.patch|https://issues.apache.org/jira/secure/attachment/12696611/KAFKA-1856_2015-02-04_15%3A44%3A47.patch] against branch trunk took 0:25:20.725178. {color:red}Overall:{color} -1 due to 2 errors {color:red}ERROR:{color} Some unit tests failed (report) {color:red}ERROR:{color} Failed unit test: {{unit.kafka.consumer.PartitionAssignorTest testRangePartitionAssignor FAILED }} {color:green}SUCCESS:{color} Gradle bootstrap was successful {color:green}SUCCESS:{color} Clean was successful {color:green}SUCCESS:{color} Patch applied correctly {color:green}SUCCESS:{color} Patch add/modify test case {color:green}SUCCESS:{color} Gradle bootstrap was successful {color:green}SUCCESS:{color} Patch compiled {color:green}SUCCESS:{color} Checked style for Main {color:green}SUCCESS:{color} Checked style for Test This message is automatically generated. Add PreCommit Patch Testing --- Key: KAFKA-1856 URL: https://issues.apache.org/jira/browse/KAFKA-1856 Project: Kafka Issue Type: Task Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-1845.result.txt, KAFKA-1856.patch, KAFKA-1856_2015-01-18_21:43:56.patch, KAFKA-1856_2015-02-04_14:57:05.patch, KAFKA-1856_2015-02-04_15:44:47.patch h1. Kafka PreCommit Patch Testing - *Don't wait for it to break* h2. Motivation *With great power comes great responsibility* - Uncle Ben. As Kafka user list is growing, mechanism to ensure quality of the product is required. Quality becomes hard to measure and maintain in an open source project, because of a wide community of contributors. Luckily, Kafka is not the first open source project and can benefit from learnings of prior projects. PreCommit tests are the tests that are run for each patch that gets attached to an open JIRA. Based on tests results, test execution framework, test bot, +1 or -1 the patch. Having PreCommit tests take the load off committers to look at or test each patch. h2. Tests in Kafka h3. Unit and Integraiton Tests [Unit and Integration tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Unit+and+Integration+Tests] are cardinal to help contributors to avoid breaking existing functionalities while adding new functionalities or fixing older ones. These tests, atleast the ones relevant to the changes, must be run by contributors before attaching a patch to a JIRA. h3. System Tests [System tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests] are much wider tests that, unlike unit tests, focus on end-to-end scenarios and not some specific method or class. h2. Apache PreCommit tests Apache provides a mechanism to automatically build a project and run a series of tests whenever a patch is uploaded to a JIRA. Based on test execution, the test framework will comment with a +1 or -1 on the JIRA. You can read more about the framework here: http://wiki.apache.org/general/PreCommitBuilds h2. Plan # Create a test-patch.py script (similar to the one used in Flume, Sqoop and other projects) that will take a jira as a parameter, apply on the appropriate branch, build the project, run tests and report results. This script should be committed into the Kafka code-base. To begin with, this will only run unit tests. We can add code sanity checks, system_tests, etc in the future. # Create a jenkins job for running the test (as described in http://wiki.apache.org/general/PreCommitBuilds) and validate that it works manually. This must be done by a committer with Jenkins access. # Ask someone with access to https://builds.apache.org/job/PreCommit-Admin/ to add Kafka to the list of projects PreCommit-Admin triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1973) Remove the accidentally created LogCleanerManager.scala.orig
[ https://issues.apache.org/jira/browse/KAFKA-1973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grant Henke reassigned KAFKA-1973: -- Assignee: Grant Henke Remove the accidentally created LogCleanerManager.scala.orig Key: KAFKA-1973 URL: https://issues.apache.org/jira/browse/KAFKA-1973 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Grant Henke Attachments: KAFKA-1973.patch It seems there is a LogCleanerManager.scala.orig in the trunk now. Need to remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis
Gwen Shapira created KAFKA-2044: --- Summary: Support requests and responses from o.a.k.common in KafkaApis Key: KAFKA-2044 URL: https://issues.apache.org/jira/browse/KAFKA-2044 Project: Kafka Issue Type: Improvement Reporter: Gwen Shapira Assignee: Gwen Shapira As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to support handling of requests and responses from o.a.k.common in KafkaApis. This will allow us to add new Api calls just in o.a.k.conmon and to gradually migrate existing requests and responses. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests
[ https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1927: Attachment: KAFKA-1927.patch Replace requests in kafka.api with requests in org.apache.kafka.common.requests --- Key: KAFKA-1927 URL: https://issues.apache.org/jira/browse/KAFKA-1927 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-1927.patch The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). This will be a big improvement because 1. We will have each request now defined in only one place (Protocol.java) 2. We will have built-in support for multi-version requests 3. We will have much better error messages (no more cryptic underflow errors) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 32448: Patch for KAFKA-1927
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32448/ --- Review request for kafka. Bugs: KAFKA-1927 https://issues.apache.org/jira/browse/KAFKA-1927 Repository: kafka Description --- support requests and responses using Common api in core modules (missing files) Diffs - core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 9a71faae3138af1b4fb48125db619ddc3ad13c5a core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 3651e8603dd0ed0d2ea059786c68cf0722aa094b core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 core/src/main/scala/kafka/api/RequestKeys.scala c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 55ecac285e00abf38d7131368bb46b4c4010dc87 core/src/main/scala/kafka/network/RequestChannel.scala 7b1db3dbbb2c0676f166890f566c14aa248467ab core/src/main/scala/kafka/server/KafkaApis.scala 35af98f0bc1b6a50bd1d97a30147593f8c6a422d core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 Diff: https://reviews.apache.org/r/32448/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests
[ https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378231#comment-14378231 ] Gwen Shapira commented on KAFKA-1927: - Created reviewboard https://reviews.apache.org/r/32448/diff/ against branch trunk Replace requests in kafka.api with requests in org.apache.kafka.common.requests --- Key: KAFKA-1927 URL: https://issues.apache.org/jira/browse/KAFKA-1927 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-1927.patch The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). This will be a big improvement because 1. We will have each request now defined in only one place (Protocol.java) 2. We will have built-in support for multi-version requests 3. We will have much better error messages (no more cryptic underflow errors) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378335#comment-14378335 ] Sriharsha Chintalapani commented on KAFKA-1684: --- [~gwenshap] if you have patch available for KAFKA-1928 can you please upload it. I can modify my ssl and kerberos patches according to the new code. Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Attachments: KAFKA-1684.patch, KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: 0.8.1 migrator
GitHub user syyang opened a pull request: https://github.com/apache/kafka/pull/52 0.8.1 migrator You can merge this pull request into a Git repository by running: $ git pull https://github.com/uber/kafka 0.8.1-migrator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/52.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #52 commit 405a04d958b1e7a8c997610d761e94c52b782c66 Author: Norbert Hu norb...@uber.com Date: 2015-02-11T05:32:45Z Create debian package for 0.8.1 commit 647fac5824d6e1e5b644770505d6e6f8bc9e5d61 Author: Norbert Hu norb...@uber.com Date: 2015-02-11T05:38:19Z Use scala 2.8.0 commit a2767a3e12350351bdd45c88e420936c1d2e4ed9 Author: Norbert Hu norb...@uber.com Date: 2015-02-11T19:03:56Z Install 0.8.1 debian package as leaf commit 0c66bb6d60d7f6a9db8f7866149348766a9e9f7c Author: Norbert Hu norb...@uber.com Date: 2015-02-15T03:40:51Z Add jmxtrans-agent to debian package This makes exporting jmx metrics via the deployed debian package much easier. See https://github.com/jmxtrans/jmxtrans-agent NOTE: the inclusion of the jmxtrans-agent jar is mostly for the migration tool as well as the mirror maker, which doesn't expose an easy way to export metrics to graphite commit 505e2d01d96513217c914f34fa0021c64d09d258 Author: Norbert Hu norb...@uber.com Date: 2015-03-12T00:15:17Z Kafka migrator tool is buggy with JDK7 See {T63657} commit b45239268609b761e07d994764165307466cf470 Author: Norbert Hu norb...@uber.com Date: 2015-03-14T06:42:12Z Revert Kafka migrator tool is buggy with JDK7 This reverts commit 505e2d01d96513217c914f34fa0021c64d09d258. commit b96e540a5602adf0b85e83b843d8f850c046bbe5 Author: Norbert Hu norb...@uber.com Date: 2015-03-18T20:07:21Z Add arcanist config commit d2d137d320e4776b3a9dbde4b630a4ebc0d5b331 Author: Norbert Hu norb...@uber.com Date: 2015-03-18T21:05:04Z Add arc land branch commit 998bc66fa3a368b37aa2e31f5e167b4cfe9c62b1 Author: Norbert Hu norb...@uber.com Date: 2015-03-18T21:12:01Z Update debian comment Reviewers: grk Reviewed By: grk Differential Revision: https://code.uberinternal.com/D83314 commit b65536b94705b0b890eb510df5610ab0e05df7e9 Author: Seung-Yeoul Yang syy...@uber.com Date: 2015-03-19T00:57:10Z [kafka] better error recovery in migrator tool (part 1) Summary: * Add a new gradle project for the migrator tool * Update .gitignore * KafkaMigrationTool is just copy-pasted. Test Plan: ./gradlew test Reviewers: grk, praveen, vinoth, csoman, norbert Reviewed By: norbert Subscribers: bigpunk, nharkins Maniphest Tasks: T67641 Differential Revision: https://code.uberinternal.com/D83463 commit ce43cf232fa02769ce774c46aa4b391c5224f471 Author: Seung-Yeoul Yang syy...@uber.com Date: 2015-03-19T01:12:08Z [kafka] better error recovery in migration tool (part 2) Summary: * check topic whiltelist/blacklist before running the migration tool * fix corrupted offsets upon detection * fail fast on erorrs and sigterm Test plan: * Added unit tests * Manually tested that corrupted offsets are fixed Reviewers: norbert, grk, praveen, csoman, vinoth Subscribers: nharkins, bigpunk Maniphest Tasks: T67641 Differential Revision: https://code.uberinternal.com/D84682 commit d6a25ef7e74f5cf74346700e80e05e6a9f6370c4 Author: Seung-Yeoul Yang syy...@uber.com Date: 2015-03-19T01:12:08Z [kafka] better error recovery in migration tool (part 2) Summary: * check topic whiltelist/blacklist before running the migration tool * fix corrupted offsets upon detection * fail fast on erorrs and sigterm Test plan: * Added unit tests * Manually tested that corrupted offsets are fixed Reviewers: norbert, grk, praveen, csoman, vinoth Subscribers: nharkins, bigpunk Maniphest Tasks: T67641 Differential Revision: https://code.uberinternal.com/D84682 commit d257ad6bef663693c3de96562189fdf83a444ed8 Author: Seung-Yeoul Yang syy...@uber.com Date: 2015-03-24T01:13:32Z foobar commit ccb63bd8940f9c02416e4a66925d7d048f1fc3a8 Author: Seung-Yeoul Yang syy...@uber.com Date: 2015-03-24T01:13:32Z foobar --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] kafka pull request: 0.8.1 migrator
Github user syyang closed the pull request at: https://github.com/apache/kafka/pull/52 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378100#comment-14378100 ] Don Bosco Durai commented on KAFKA-1688: [~prasadm], I agree with you, we should support ALL or equivalent keyword like (*) in the default implementation. I remember the Hierarchical subject brought up during one the google hangout discussion around authorization. I don't think there were any resolutions around it. Does it make sense to make this as a custom implementation feature? So for OOTB, it would be be just topic name, but anyone who want to implement hierarchical privileges can parse the topic name and use . or any other supported character has delimiter and provide namespace/database like permissions. FYI, it seems, hierarchical Topics was discussed back in 2012 https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Parth Brahmbhatt Fix For: 0.8.3 Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 32440: Patch for KAFKA-2043
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32440/#review77593 --- clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/32440/#comment125718 Since its a Producer level config, is this change needed. We can keep it as an instance variable. Also since the compression type does not change, the private final makes it more clear. What do you think? - Mayuresh Gharat On March 24, 2015, 3:51 p.m., Grant Henke wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32440/ --- (Updated March 24, 2015, 3:51 p.m.) Review request for kafka. Bugs: KAFKA-2043 https://issues.apache.org/jira/browse/KAFKA-2043 Repository: kafka Description --- CompressionType is passed in each RecordAccumulator append Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java feda9c922d7dab17e424f8e6f0aa0a3f968e3729 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 24274a64885fadd0e9318de2beb232218ddd52cd Diff: https://reviews.apache.org/r/32440/diff/ Testing --- Thanks, Grant Henke
[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-2042: Priority: Blocker (was: Major) New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Attachments: KAFKA-2042.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1991) Clean ControllerStats initialization
[ https://issues.apache.org/jira/browse/KAFKA-1991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grant Henke reassigned KAFKA-1991: -- Assignee: Grant Henke Clean ControllerStats initialization Key: KAFKA-1991 URL: https://issues.apache.org/jira/browse/KAFKA-1991 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2.0 Reporter: Grant Henke Assignee: Grant Henke Priority: Trivial Attachments: KAFKA-1991.patch This is just a trivial clean up. Values defined by an object are instantiated lazily and are initialized the first time the object is used. This could cause confusion and down the road issues about when/what metrics are initialized in the ControllerStats object. KafkaServer.scala makes a call to each value to initialize it but Scala is not actually behaving that way. The change matches the BrokerTopicStats implementation example: scala object ControllerStats { |val uncleanLeaderElectionRate = { | println(initializing uncleanLeaderElectionRate) | uncleanLeaderElectionRate | } |val leaderElectionTimer = { | println(initializing leaderElectionTimer) | leaderElectionTimer |} | } defined object ControllerStats scala ControllerStats.uncleanLeaderElectionRate initializing uncleanLeaderElectionRate initializing leaderElectionTimer res7: String = uncleanLeaderElectionRate -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 32440: Patch for KAFKA-2043
On March 24, 2015, 5 p.m., Mayuresh Gharat wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 134 https://reviews.apache.org/r/32440/diff/1/?file=904417#file904417line134 Since its a Producer level config, is this change needed. We can keep it as an instance variable. Also since the compression type does not change, the private final makes it more clear. What do you think? Grant Henke wrote: I don't mind leaving the instance level config. However, since it is not used anywhere but the constructor I don't see the value in it. If we want to mark it as final we can in the constructor and have the same clarity. The only reason I didn't initially is because the other code did not seam to follow the style of putting final on everything. (Note: I would prefer to put final on everything) Completely agree that its not used as much. Not having it as instance level config is not going to effect the functionality, just that someone would prefer the configs to be together. - Mayuresh --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32440/#review77593 --- On March 24, 2015, 3:51 p.m., Grant Henke wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32440/ --- (Updated March 24, 2015, 3:51 p.m.) Review request for kafka. Bugs: KAFKA-2043 https://issues.apache.org/jira/browse/KAFKA-2043 Repository: kafka Description --- CompressionType is passed in each RecordAccumulator append Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java feda9c922d7dab17e424f8e6f0aa0a3f968e3729 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 24274a64885fadd0e9318de2beb232218ddd52cd Diff: https://reviews.apache.org/r/32440/diff/ Testing --- Thanks, Grant Henke
[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior
[ https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378397#comment-14378397 ] Guozhang Wang commented on KAFKA-1461: -- Sorry for the delay, I will take a look at 31366 today. Replica fetcher thread does not implement any back-off behavior --- Key: KAFKA-1461 URL: https://issues.apache.org/jira/browse/KAFKA-1461 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.1.1 Reporter: Sam Meder Assignee: Sriharsha Chintalapani Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1461.patch, KAFKA-1461.patch, KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch, KAFKA-1461_2015-03-12_13:54:51.patch, KAFKA-1461_2015-03-17_16:03:33.patch The current replica fetcher thread will retry in a tight loop if any error occurs during the fetch call. For example, we've seen cases where the fetch continuously throws a connection refused exception leading to several replica fetcher threads that spin in a pretty tight loop. To a much lesser degree this is also an issue in the consumer fetcher thread, although the fact that erroring partitions are removed so a leader can be re-discovered helps some. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378433#comment-14378433 ] Dmitry Bugaychenko edited comment on KAFKA-2029 at 3/24/15 7:34 PM: We tried prioritization of controller messages, but it din't help. Communication with a single broker is synchronous, but different brokers might handle requests on different speed - as a result with a large queue one broker can get way behind another one while controller thinks it is doing fine. Addiding tracked we ensure taht *all* brokers done the leadership movement, thus no one can get behind others to more than one partition. The fix for controller messages prioritization were in RequestChannel.scala: {code} ... class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { ... private val requestQueue = new LinkedBlockingDeque[RequestChannel.Request](queueSize) ... /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId == RequestKeys.StopReplicaKey) { // ODKL Patch: prioritize controller requests over data requests. requestQueue.putFirst(request) info(Escalated controller request: + request.requestObj.describe(details = true)) } else { requestQueue.putLast(request) } } ... /** Get the next request or block until there is one */ def receiveRequest(): RequestChannel.Request = requestQueue.takeFirst() ... {code} It increased GC overhead but hasn't improved the speed of partitions movement - it looks like the network request processing is not the botlleneck. was (Author: dmitrybugaychenko): We tried prioritization of controller messages, but it din't help. Communication with a single broker is synchronous, but different brokers might handle requests on different speed - as a result with a large queue one broker can get way behind another one while controller thinks it is doing fine. Addiding tracked we ensure taht *all* brokers done the leadership movement, thus no one can get behind others to more than one partition. The fix for controller messages prioritization were in RequestChannel.scala: {code} ... class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { ... private val requestQueue = new LinkedBlockingDeque[RequestChannel.Request](queueSize) ... /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId == RequestKeys.StopReplicaKey) { // ODKL Patch: prioritize controller requests over data requests. requestQueue.putFirst(request) info(Escalated controller request: + request.requestObj.describe(details = true)) } else { requestQueue.putLast(request) } } ... /** Get the next request or block until there is one */ def receiveRequest(): RequestChannel.Request = requestQueue.takeFirst() ... {code} It increased GC overhead but didn't improved the speed of partitions movement - it looks like the network request processing is not the botlleneck. Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in
[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378433#comment-14378433 ] Dmitry Bugaychenko commented on KAFKA-2029: --- We tried prioritization of controller messages, but it din't help. Communication with a single broker is synchronous, but different brokers might handle requests on different speed - as a result with a large queue one broker can get way behind another one while controller thinks it is doing fine. Addiding tracked we ensure taht *all* brokers done the leadership movement, thus no one can get behind others to more than one partition. The fix for controller messages prioritization were in RequestChannel.scala: {code} ... class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { ... private val requestQueue = new LinkedBlockingDeque[RequestChannel.Request](queueSize) ... /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId == RequestKeys.StopReplicaKey) { // ODKL Patch: prioritize controller requests over data requests. requestQueue.putFirst(request) info(Escalated controller request: + request.requestObj.describe(details = true)) } else { requestQueue.putLast(request) } } ... /** Get the next request or block until there is one */ def receiveRequest(): RequestChannel.Request = requestQueue.takeFirst() ... {code} It increased GC overhead but didn't improved the speed of partitions movement - it looks like the network request processing is not the botlleneck. Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) =
Re: [VOTE] KIP-15 add a close method with timeout to KafkaProducer
Push up the thread for voting after discussion on the KIP hangout. On 3/19/15, 9:03 PM, Jiangjie Qin j...@linkedin.com wrote: We had some additional discussions on the discussion thread. Pushing up this thread to resume voting. On 3/11/15, 8:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah guys, I'd like to second that. I'd really really love to get the quality of these to the point where we could broadly solicit user input and use them as a permanent document of the alternatives and rationale. I know it is a little painful to have process, but I think we all saw what happened to the previous clients as public interfaces so I really really really want us to just be incredibly thoughtful and disciplined as we make changes. I think we all want to avoid another client rewrite. To second Joe's question in a more specific way, I think an alternative I don't see considered to give close() a bounded time is just to enforce the request time on the client side, which will cause all requests to be failed after the request timeout expires. This was the same behavior as for flush. In the case where the user just wants to ensure close doesn't block forever I think that may be sufficient? So one alternative might be to just do that request timeout feature and add a new producer config that is something like abort.on.failure=false which causes the producer to hard exit if it can't send a request. Which I think is closer to what you want, with this just being a way to implement that behavior. I'm not sure if this is better or worse, but we should be sure before we make the change. I also have a concern about producer.close(0, TimeUnit.MILLISECONDS) not meaning close with a timeout of 0 ms. I realize this exists in other java apis, but it is so confusing it even confused us into having that recent producer bug because of course all the other numbers mean wait that long. I'd propose close()--block until all completed close(0, TimeUnit.MILLISECONDS)--block for 0 ms close(5, TimeUnit.MILLISECONDS)--block for 5 ms close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative ms would mean completing in the past :-) -Jay On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein joe.st...@stealth.ly wrote: Could the KIP confluence please have updated the discussion thread link, thanks... could you also remove the template boilerplate at the top *This page is meant as a template ..* so we can capture it for the release cleanly. Also I don't really/fully understand how this is different than flush(time); close() and why close has its own timeout also? Lastly, what is the forceClose flag? This isn't documented in the public interface so it isn't clear how to completely use the feature just by reading the KIP. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang wangg...@gmail.com wrote: +1 (binding) On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m e thod+with+a+timeout+in+the+producer -- -- Guozhang
Fwd: MirrorMaker improvements
Dear all, I had a short discussion with Jay yesterday at the ACM meetup and he suggested writing an email regarding a few possible MirrorMaker improvements. At Turn, we have been using MirrorMaker for a a few months now to asynchronously replicate our key/value store data between our datacenters. In a way, our system is similar to Linkedin's Databus, but it uses Kafka clusters and MirrorMaker as its building blocks. Our overall message rate peaks at about 650K/sec and, when pushing data over high bandwidth delay product links, we have found some minor bottlenecks. The MirrorMaker process uses a standard consumer to pull data from a remote datacenter. This implies that it opens a single TCP connection to each of the remote brokers and muxes requests for different topics and partitions over this connection. While this is a good thing in terms of maintaining the congestion window open, over long RTT lines with rather high loss rate the congestion window will cap, in our case at just a few Mbps. While the overall line bandwidth is much higher, this means that we have to start multiple MirrorMaker processes (somewhere in the hundreds), in order to completely use the line capacity. Being able to pool multiple TCP connections from a single consumer to a broker would solve this complication. The standard consumer also uses the remote ZooKeeper in order to manage the consumer group. While consumer group management is moving closer to the brokers, it might make sense to move the group management to the local datacenter, since that would avoid using the long-distance connection for this purpose. Another possible improvement assumes a further constraint, namely that the number of partitions for a topic in both datacenters is the same. In my opinion, this is a sane constraint, since it preserves the Kafka ordering guarantees (per partition), instead of a simple guarantee per key. This kind of guarantee can be for example useful in a system that compares partition contents to reach eventual consistency using Merkle trees. If the number of partitions is equal, then offsets have the same meaning for the same partition in both clusters, since the data for both partitions is identical before the offset. This allows a simple consumer to just inquire the local broker and the remote broker for their current offsets and, in case the remote broker is ahead, copy the extra data to the local cluster. Since the consumer offsets are no longer bound to the specific partitioning of a single remote cluster, the consumer could pull from one of any number of remote clusters, BitTorrent-style, if their offsets are ahead of the local offset. The group management problem would reduce to assigning local partitions to different MirrorMaker processes, so the group management could be done locally also in this situation. Regards, Vlad PS: Sorry if this is a double posting! The original posting did not appear in the archives for a while.
Re: KIP discussion Mar 24 at 11am PST
Just to keep everyone posted. The following is a summary of what's being discussed in the KIP hangout today. KIP-4 (admin commands): * Gwen is uploading a patch in KAFKA-1927 (refactoring requests) so that we can get unblocked of adding new requests. * We will combine DescribeTopic and TopicMetadata in the future. * We will leave the admin requests async for now. * We will not add a VerifyReassignPartitionRequest for now. We can do that later when we improve the verification process. * We need to discuss a bit more on how to expose the controller info to the client. * Andrii will send out more details on the KIP thread. KIP-15 (close): * If close() or close with a non-zero timeout is called from the send thread, we will log it as an error. * Jiangjie will follow up on the KIP thread. KIP-13 (quota): * Need a separate discussion on whether to use the new metrics package on the broker on the mailing list. * There are a few other details being discuss and Aditya will follow up on the KIP thread. Thanks, Jun On Fri, Mar 20, 2015 at 2:44 PM, Jun Rao j...@confluent.io wrote: Hi, Everyone, We plan to have a KIP discussion on Google hangout on Mar 24 at 11am PST. If you are interested in participating and have not already received a calendar invitation, please let me know. The following is the agenda. KIP-4 (admin commands): 10 mins * status of KAFKA-1927 (refactoring requests). which blocks this KIP * status of KAFKA-1634 (improve OffsetCommitRequest), which blocks KAFKA-1927 * any remaining issues for discussion KIP-15 (close): 10 mins * semantics of close() and close(timeout) KIP-13 (quota): * protocol change to reflect the state of throttling * dependency on using the new metrics package * dependency KIP-5 (broker configuration) Thanks, Jun
[jira] [Comment Edited] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378433#comment-14378433 ] Dmitry Bugaychenko edited comment on KAFKA-2029 at 3/24/15 7:35 PM: We tried prioritization of controller messages, but it din't help. Communication with a single broker is synchronous, but different brokers might handle requests on different speed - as a result with a large queue one broker can get way behind another one while controller thinks it is doing fine. Addiding tracked we ensure that *all* brokers done the leadership movement, thus no one can get behind others to more than one partition. The fix for controller messages prioritization were in RequestChannel.scala: {code} ... class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { ... private val requestQueue = new LinkedBlockingDeque[RequestChannel.Request](queueSize) ... /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId == RequestKeys.StopReplicaKey) { // ODKL Patch: prioritize controller requests over data requests. requestQueue.putFirst(request) info(Escalated controller request: + request.requestObj.describe(details = true)) } else { requestQueue.putLast(request) } } ... /** Get the next request or block until there is one */ def receiveRequest(): RequestChannel.Request = requestQueue.takeFirst() ... {code} It increased GC overhead but hasn't improved the speed of partitions movement - it looks like the network request processing is not the botlleneck. was (Author: dmitrybugaychenko): We tried prioritization of controller messages, but it din't help. Communication with a single broker is synchronous, but different brokers might handle requests on different speed - as a result with a large queue one broker can get way behind another one while controller thinks it is doing fine. Addiding tracked we ensure taht *all* brokers done the leadership movement, thus no one can get behind others to more than one partition. The fix for controller messages prioritization were in RequestChannel.scala: {code} ... class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { ... private val requestQueue = new LinkedBlockingDeque[RequestChannel.Request](queueSize) ... /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId == RequestKeys.StopReplicaKey) { // ODKL Patch: prioritize controller requests over data requests. requestQueue.putFirst(request) info(Escalated controller request: + request.requestObj.describe(details = true)) } else { requestQueue.putLast(request) } } ... /** Get the next request or block until there is one */ def receiveRequest(): RequestChannel.Request = requestQueue.takeFirst() ... {code} It increased GC overhead but hasn't improved the speed of partitions movement - it looks like the network request processing is not the botlleneck. Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in
MirrorMaker improvements
Dear all, I had a short discussion with Jay yesterday at the ACM meetup and he suggested writing an email regarding a few possible MirrorMaker improvements. At Turn, we have been using MirrorMaker for a a few months now to asynchronously replicate our key/value store data between our datacenters. In a way, our system is similar to Linkedin's Databus, but it uses Kafka clusters and MirrorMaker as its building blocks. Our overall message rate peaks at about 650K/sec and, when pushing data over high bandwidth delay product links, we have found some minor bottlenecks. The MirrorMaker process uses a standard consumer to pull data from a remote datacenter. This implies that it opens a single TCP connection to each of the remote brokers and muxes requests for different topics and partitions over this connection. While this is a good thing in terms of maintaining the congestion window open, over long RTT lines with rather high loss rate the congestion window will cap, in our case at just a few Mbps. While the overall line bandwidth is much higher, this means that we have to start multiple MirrorMaker processes (somewhere in the hundreds), in order to completely use the line capacity. Being able to pool multiple TCP connections from a single consumer to a broker would solve this complication. The standard consumer also uses the remote ZooKeeper in order to manage the consumer group. While consumer group management is moving closer to the brokers, it might make sense to move the group management to the local datacenter, since that would avoid using the long-distance connection for this purpose. Another possible improvement assumes a further constraint, namely that the number of partitions for a topic in both datacenters is the same. In my opinion, this is a sane constraint, since it preserves the Kafka ordering guarantees (per partition), instead of a simple guarantee per key. This kind of guarantee can be for example useful in a system that compares partition contents to reach eventual consistency using Merkle trees. If the number of partitions is equal, then offsets have the same meaning for the same partition in both clusters, since the data for both partitions is identical before the offset. This allows a simple consumer to just inquire the local broker and the remote broker for their current offsets and, in case the remote broker is ahead, copy the extra data to the local cluster. Since the consumer offsets are no longer bound to the specific partitioning of a single remote cluster, the consumer could pull from one of any number of remote clusters, BitTorrent-style, if their offsets are ahead of the local offset. The group management problem would reduce to assigning local partitions to different MirrorMaker processes, so the group management could be done locally also in this situation. Regards, Vlad
[jira] [Created] (KAFKA-2045) Memory Management on the consumer
Guozhang Wang created KAFKA-2045: Summary: Memory Management on the consumer Key: KAFKA-2045 URL: https://issues.apache.org/jira/browse/KAFKA-2045 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang We need to add the memory management on the new consumer like we did in the new producer. This would probably include: 1. byte buffer re-usage for fetch response partition data. 2. byte buffer re-usage for on-the-fly de-compression. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-15 add a close method with timeout to KafkaProducer
Actually, since there are already votes on this and the KIP has changed a bit we should cancel this and start a new thread. On Tue, Mar 24, 2015 at 12:19 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Push up the thread for voting after discussion on the KIP hangout. On 3/19/15, 9:03 PM, Jiangjie Qin j...@linkedin.com wrote: We had some additional discussions on the discussion thread. Pushing up this thread to resume voting. On 3/11/15, 8:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah guys, I'd like to second that. I'd really really love to get the quality of these to the point where we could broadly solicit user input and use them as a permanent document of the alternatives and rationale. I know it is a little painful to have process, but I think we all saw what happened to the previous clients as public interfaces so I really really really want us to just be incredibly thoughtful and disciplined as we make changes. I think we all want to avoid another client rewrite. To second Joe's question in a more specific way, I think an alternative I don't see considered to give close() a bounded time is just to enforce the request time on the client side, which will cause all requests to be failed after the request timeout expires. This was the same behavior as for flush. In the case where the user just wants to ensure close doesn't block forever I think that may be sufficient? So one alternative might be to just do that request timeout feature and add a new producer config that is something like abort.on.failure=false which causes the producer to hard exit if it can't send a request. Which I think is closer to what you want, with this just being a way to implement that behavior. I'm not sure if this is better or worse, but we should be sure before we make the change. I also have a concern about producer.close(0, TimeUnit.MILLISECONDS) not meaning close with a timeout of 0 ms. I realize this exists in other java apis, but it is so confusing it even confused us into having that recent producer bug because of course all the other numbers mean wait that long. I'd propose close()--block until all completed close(0, TimeUnit.MILLISECONDS)--block for 0 ms close(5, TimeUnit.MILLISECONDS)--block for 5 ms close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative ms would mean completing in the past :-) -Jay On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein joe.st...@stealth.ly wrote: Could the KIP confluence please have updated the discussion thread link, thanks... could you also remove the template boilerplate at the top *This page is meant as a template ..* so we can capture it for the release cleanly. Also I don't really/fully understand how this is different than flush(time); close() and why close has its own timeout also? Lastly, what is the forceClose flag? This isn't documented in the public interface so it isn't clear how to completely use the feature just by reading the KIP. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang wangg...@gmail.com wrote: +1 (binding) On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m e thod+with+a+timeout+in+the+producer -- -- Guozhang
Re: C++ Client Library -- libkafka-asio
@Ewen added license.txt (boost v1.0) thanks svante 2015-03-24 2:15 GMT+01:00 Ewen Cheslack-Postava e...@confluent.io: You don't get edit permission by default, you need to get one of the admins to give you permission. @Daniel, I've added libkafka-asio. @svante I started to add csi-kafka, but couldn't find a license? On Sun, Mar 22, 2015 at 8:29 AM, svante karlsson s...@csi.se wrote: Cool, Looks nice. I was looking for something similar a year ago. We also ended up rolling our own. https://github.com/bitbouncer/csi-kafka Have you got any performance figures? /svante 2015-03-22 14:29 GMT+01:00 Daniel Joos dan...@joosweb.de: Hello there, I'm currently working on a C++ client library, implementing the Kafka protocol using Boost ASIO. You can find the source code and some examples on github: https://github.com/danieljoos/libkafka-asio I tried to add it to the Clients section of the Kafka wiki, but either I'm to blind to see the Edit button, or I just don't have enough permissions to edit the page ;-) In case you like the library, it would be very nice, if someone with sufficient permissions for the wiki could add it there. Thanks. Best regards, Daniel -- Thanks, Ewen
Re: Review Request 32434: Patch for KAFKA-2042
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32434/#review77634 --- As discussed offline, I think it is better to make the fix in the app-level (KafkaProducer, Metadata) rather than forbidding sending emptry MetadataRequest in NetworkClient. - Guozhang Wang On March 24, 2015, 8:17 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32434/ --- (Updated March 24, 2015, 8:17 a.m.) Review request for kafka. Bugs: KAFKA-2042 https://issues.apache.org/jira/browse/KAFKA-2042 Repository: kafka Description --- Patch for KAFKA-2042. Do not update metadata for empty topic set in new producer Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java f4295025c28e2842244dc775052b7a3d30fb9d11 Diff: https://reviews.apache.org/r/32434/diff/ Testing --- Thanks, Jiangjie Qin
Re: [VOTE] KIP-15 add a close method with timeout to KafkaProducer
+1, let's just start a new thread for this. On Tue, Mar 24, 2015 at 12:23 PM, Joel Koshy jjkosh...@gmail.com wrote: Actually, since there are already votes on this and the KIP has changed a bit we should cancel this and start a new thread. On Tue, Mar 24, 2015 at 12:19 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Push up the thread for voting after discussion on the KIP hangout. On 3/19/15, 9:03 PM, Jiangjie Qin j...@linkedin.com wrote: We had some additional discussions on the discussion thread. Pushing up this thread to resume voting. On 3/11/15, 8:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah guys, I'd like to second that. I'd really really love to get the quality of these to the point where we could broadly solicit user input and use them as a permanent document of the alternatives and rationale. I know it is a little painful to have process, but I think we all saw what happened to the previous clients as public interfaces so I really really really want us to just be incredibly thoughtful and disciplined as we make changes. I think we all want to avoid another client rewrite. To second Joe's question in a more specific way, I think an alternative I don't see considered to give close() a bounded time is just to enforce the request time on the client side, which will cause all requests to be failed after the request timeout expires. This was the same behavior as for flush. In the case where the user just wants to ensure close doesn't block forever I think that may be sufficient? So one alternative might be to just do that request timeout feature and add a new producer config that is something like abort.on.failure=false which causes the producer to hard exit if it can't send a request. Which I think is closer to what you want, with this just being a way to implement that behavior. I'm not sure if this is better or worse, but we should be sure before we make the change. I also have a concern about producer.close(0, TimeUnit.MILLISECONDS) not meaning close with a timeout of 0 ms. I realize this exists in other java apis, but it is so confusing it even confused us into having that recent producer bug because of course all the other numbers mean wait that long. I'd propose close()--block until all completed close(0, TimeUnit.MILLISECONDS)--block for 0 ms close(5, TimeUnit.MILLISECONDS)--block for 5 ms close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative ms would mean completing in the past :-) -Jay On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein joe.st...@stealth.ly wrote: Could the KIP confluence please have updated the discussion thread link, thanks... could you also remove the template boilerplate at the top *This page is meant as a template ..* so we can capture it for the release cleanly. Also I don't really/fully understand how this is different than flush(time); close() and why close has its own timeout also? Lastly, what is the forceClose flag? This isn't documented in the public interface so it isn't clear how to completely use the feature just by reading the KIP. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang wangg...@gmail.com wrote: +1 (binding) On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m e thod+with+a+timeout+in+the+producer -- -- Guozhang -- -- Guozhang
[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-2042: Attachment: KAFKA-2042_2015-03-24_13:37:49.patch New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 32434: Patch for KAFKA-2042
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32434/ --- (Updated March 24, 2015, 8:37 p.m.) Review request for kafka. Bugs: KAFKA-2042 https://issues.apache.org/jira/browse/KAFKA-2042 Repository: kafka Description (updated) --- Move the change to KafkaProducer after talking to Guozhang offline. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java feda9c922d7dab17e424f8e6f0aa0a3f968e3729 Diff: https://reviews.apache.org/r/32434/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378551#comment-14378551 ] Jiangjie Qin commented on KAFKA-2042: - Updated reviewboard https://reviews.apache.org/r/32434/diff/ against branch origin/trunk New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.
[ https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378586#comment-14378586 ] Jiangjie Qin commented on KAFKA-2042: - Updated reviewboard https://reviews.apache.org/r/32434/diff/ against branch origin/trunk New producer metadata update always get all topics. --- Key: KAFKA-2042 URL: https://issues.apache.org/jira/browse/KAFKA-2042 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, KAFKA-2042_2015-03-24_13:57:23.patch The new java producer metadata.topics is initially empty so the producer sends TMR with empty topic set. The broker takes the empty requested topic set as all topics, so metadata.cluster contains all topic metadata. Later on, when a new topic was produced, it gets added into the metadata.topics. The next metadata update will only contain the meta data for this new topic, so the metadata.cluster will only have this topic. Since there are a lot of messages are still in the accumulator but has no metadata in metadata.cluster, if a caller thread do a flush(), the caller thread will block forever because the messages sitting in accumulator without metadata will never be ready to send. We should add check for the metadata.topics, if it is empty, no TMR should be sent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 32434: Patch for KAFKA-2042
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32434/ --- (Updated March 24, 2015, 8:57 p.m.) Review request for kafka. Bugs: KAFKA-2042 https://issues.apache.org/jira/browse/KAFKA-2042 Repository: kafka Description (updated) --- Move the change to KafkaProducer after talking to Guozhang offline. A less expensive fix Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/Metadata.java c8bde8b732defa20819730d87303a9a80d01116f clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java feda9c922d7dab17e424f8e6f0aa0a3f968e3729 Diff: https://reviews.apache.org/r/32434/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-2045) Memory Management on the consumer
[ https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378476#comment-14378476 ] Jay Kreps commented on KAFKA-2045: -- There are really two issues: 1. Bounding fetch size while still guaranteeing that you eventually get data from each partition 2. Pooling and reusing byte buffers I actually think (1) is really pressing, but (2) is just an optimization that may or may not have high payoff. (1) is what leads to the huge memory allocations and sudden OOM when a consumer falls behind and then suddenly has lots of data or when partition assignment changes. For (1) I think we need to figure out whether this is (a) some heuristic in the consumer which decides to only do fetches for a subset of topic/partitions or (b) a new parameter in the fetch request that gives a total bound on the request size. I think we discussed this a while back and agreed on (b), but I can't remember now. The argument if I recall was that that was the only way for the server to monitor all the subscribed topics and avoid blocking on an empty topic while non-empty partitions have data. Bounding the allocations should help performance a lot too. If we do this bounding then I think reuse will be a lot easier to since each response will use at most that many bytes and you could potentially even just statically allocate the byte buffer for each partition and reuse it. Memory Management on the consumer - Key: KAFKA-2045 URL: https://issues.apache.org/jira/browse/KAFKA-2045 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang We need to add the memory management on the new consumer like we did in the new producer. This would probably include: 1. byte buffer re-usage for fetch response partition data. 2. byte buffer re-usage for on-the-fly de-compression. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests
[ https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1927: Status: Patch Available (was: Open) Replace requests in kafka.api with requests in org.apache.kafka.common.requests --- Key: KAFKA-1927 URL: https://issues.apache.org/jira/browse/KAFKA-1927 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-1927.patch The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). This will be a big improvement because 1. We will have each request now defined in only one place (Protocol.java) 2. We will have built-in support for multi-version requests 3. We will have much better error messages (no more cryptic underflow errors) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests
[ https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14378236#comment-14378236 ] Gwen Shapira commented on KAFKA-1927: - The uploaded patch is: 1. Preliminary and untested, just to show what I had in mind. 2. Actually belongs in KAFKA-2044... Replace requests in kafka.api with requests in org.apache.kafka.common.requests --- Key: KAFKA-1927 URL: https://issues.apache.org/jira/browse/KAFKA-1927 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-1927.patch The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). This will be a big improvement because 1. We will have each request now defined in only one place (Protocol.java) 2. We will have built-in support for multi-version requests 3. We will have much better error messages (no more cryptic underflow errors) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Discussion] Using Client Requests and Responses in Server
Hi, I uploaded a (very) preliminary patch with my idea. One thing thats missing: RequestResponse had handleError method that all requests implemented, typically generating appropriate error Response for the request and sending it along. Its used by KafkaApis to handle all protocol errors for valid requests that are not handled elsewhere. AbstractRequestResponse doesn't have such method. I can, of course, add it. But before I jump into this, I'm wondering if there was another plan on handling Api errors. Gwen On Mon, Mar 23, 2015 at 6:16 PM, Jun Rao j...@confluent.io wrote: I think what you are saying is that in RequestChannel, we can start generating header/body for new request types and leave requestObj null. For existing requests, header/body will be null initially. Gradually, we can migrate each type of requests by populating header/body, instead of requestObj. This makes sense to me since it serves two purposes (1) not polluting the code base with duplicated request/response objects for new types of requests and (2) allowing the refactoring of existing requests to be done in smaller pieces. Could you try that approach and perhaps just migrate one existing request type (e.g. HeartBeatRequest) as an example? We probably need to rewind the buffer after reading the requestId when deserializing the header (since the header includes the request id). Thanks, Jun On Mon, Mar 23, 2015 at 4:52 PM, Gwen Shapira gshap...@cloudera.com wrote: I'm thinking of a different approach, that will not fix everything, but will allow adding new requests without code duplication (and therefore unblock KIP-4): RequestChannel.request currently takes a buffer and parses it into an old request object. Since the objects are byte-compatibly, we should be able to parse existing requests into both old and new objects. New requests will only be parsed into new objects. Basically: val requestId = buffer.getShort() if (requestId in keyToNameAndDeserializerMap) { requestObj = RequestKeys.deserializerForKey(requestId)(buffer) header: RequestHeader = RequestHeader.parse(buffer) body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] } else { requestObj = null header: RequestHeader = RequestHeader.parse(buffer) body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] } This way existing KafkaApis will keep working as normal. The new Apis can implement just the new header/body requests. We'll do the same on the send-side: BoundedByteBufferSend can have a constructor that takes header/body instead of just a response object. Does that make sense? Once we have this in, we can move to: * Adding the missing request/response to the client code * Replacing requests that can be replaced It will also make life easier by having us review and tests smaller chunks of work (the existing patch is *huge* , touches nearly every core component and I'm not done yet...) Gwen On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps jay.kr...@gmail.com wrote: Ack, yeah, forgot about that. It's not just a difference of wrappers. The server side actually sends the bytes lazily using FileChannel.transferTo. We need to make it possible to carry over that optimization. In some sense what we want to be able to do is set a value to a Send instead of a ByteBuffer. Let me try to add that support to the protocol definition stuff, will probably take me a few days to free up time. -Jay On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira gshap...@cloudera.com wrote: In case anyone is still following this thread, I need a bit of help :) The old FetchResponse.PartitionData included a MessageSet object. The new FetchResponse.PartitionData includes a ByteBuffer. However, when we read from logs, we return a MessageSet, and as far as I can see, these can't be converted to ByteBuffers (at least not without copying their data). Did anyone consider how to reconcile the MessageSets with the new FetchResponse objects? Gwen On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira gshap...@cloudera.com wrote: Note: I'm also treating ZkUtils as if it was a public API (i.e. converting objects that are returned into o.a.k.common equivalents but not changing ZkUtils itself). I know its not public, but I suspect I'm not the only developer here who has tons of external code that uses it. Gwen On Wed, Mar 18, 2015 at 5:48 PM, Gwen Shapira gshap...@cloudera.com wrote: We can't rip them out completely, unfortunately - the SimpleConsumer uses them. So we'll need conversion at some point. I'll try to make the conversion point just before hitting a public API that we can't modify, and hopefully it
Re: Review Request 32440: Patch for KAFKA-2043
On March 24, 2015, 5 p.m., Mayuresh Gharat wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 134 https://reviews.apache.org/r/32440/diff/1/?file=904417#file904417line134 Since its a Producer level config, is this change needed. We can keep it as an instance variable. Also since the compression type does not change, the private final makes it more clear. What do you think? I don't mind leaving the instance level config. However, since it is not used anywhere but the constructor I don't see the value in it. If we want to mark it as final we can in the constructor and have the same clarity. The only reason I didn't initially is because the other code did not seam to follow the style of putting final on everything. (Note: I would prefer to put final on everything) - Grant --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32440/#review77593 --- On March 24, 2015, 3:51 p.m., Grant Henke wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32440/ --- (Updated March 24, 2015, 3:51 p.m.) Review request for kafka. Bugs: KAFKA-2043 https://issues.apache.org/jira/browse/KAFKA-2043 Repository: kafka Description --- CompressionType is passed in each RecordAccumulator append Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java feda9c922d7dab17e424f8e6f0aa0a3f968e3729 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 24274a64885fadd0e9318de2beb232218ddd52cd Diff: https://reviews.apache.org/r/32440/diff/ Testing --- Thanks, Grant Henke