[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps
[ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181170#comment-14181170 ] Honghai Chen commented on KAFKA-391: Why this fix add this line code? Sometimes got 2 responses for one request, Why fall into this situation, will there be duplicate data in Kafka? https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=commitdiff;h=b688c3ba045df340bc32caa40ba1909eddbcbec5 +if (response.status.size != producerRequest.data.size)+ throw new KafkaException(Incomplete response (%s) for producer request (%s)+ .format(response, producerRequest)) Producer request and response classes should use maps - Key: KAFKA-391 URL: https://issues.apache.org/jira/browse/KAFKA-391 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Joel Koshy Priority: Blocker Labels: optimization Fix For: 0.8.0 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, KAFKA-391-v3.patch, KAFKA-391-v4.patch Producer response contains two arrays of error codes and offsets - the ordering in these arrays correspond to the flattened ordering of the request arrays. It would be better to switch to maps in the request and response as this would make the code clearer and more efficient (right now, linear scans are used in handling producer acks). We can probably do the same in the fetch request/response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 27071: Patch for KAFKA-1684
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27071/ --- Review request for kafka. Bugs: KAFKA-1684 https://issues.apache.org/jira/browse/KAFKA-1684 Repository: kafka Description --- working working working working working almost done almost done done one more thing one more thing one more thing Diffs - config/client.keystore PRE-CREATION config/client.public-key PRE-CREATION config/client.ssl.properties PRE-CREATION config/log4j.properties 95022543debedf26b5da58cc019de5d5b809f97f config/server.keystore PRE-CREATION config/server.properties b0e4496a8ca736b6abe965a430e8ce87b0e8287f config/server.public-key PRE-CREATION config/server.ssl.properties PRE-CREATION contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java 1d0e0a917985736bef7af66c741e5807d8503121 core/src/main/scala/kafka/api/RequestKeys.scala c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 core/src/main/scala/kafka/api/TopicMetadataForChannelRequest.scala PRE-CREATION core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 530982e36b17934b8cc5fb668075a5342e142c59 core/src/main/scala/kafka/client/ClientUtils.scala ebba87f0566684c796c26cb76c64b4640a5ccfde core/src/main/scala/kafka/cluster/Broker.scala 0060add008bb3bc4b0092f2173c469fce0120be6 core/src/main/scala/kafka/common/BrokerChannelNotAvailableException.scala PRE-CREATION core/src/main/scala/kafka/consumer/ConsumerConfig.scala 9ebbee6c16dc83767297c729d2d74ebbd063a993 core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b9e2bea7b442a19bcebd1b350d39541a8c9dd068 core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd core/src/main/scala/kafka/consumer/SimpleConsumer.scala d349a3000feb9ccd57d1f3cb163548d5bf432186 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 core/src/main/scala/kafka/controller/KafkaController.scala 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 3e87e1d36f87b7dd539a474609b1c95487a4c337 core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala 0ab0195dc9f66f407061d0fac2549bd6291e41ff core/src/main/scala/kafka/network/BlockingChannel.scala eb7bb14d94cb3648c06d4de36a3b34aacbde4556 core/src/main/scala/kafka/network/ChannelFactory.scala PRE-CREATION core/src/main/scala/kafka/network/ChannelInfo.scala PRE-CREATION core/src/main/scala/kafka/network/ChannelType.scala PRE-CREATION core/src/main/scala/kafka/network/KafkaChannel.scala PRE-CREATION core/src/main/scala/kafka/network/PlainSocketChannelFactory.scala PRE-CREATION core/src/main/scala/kafka/network/SocketServer.scala cee76b323e5f3e4c783749ac9e78e1ef02897e3b core/src/main/scala/kafka/network/ssl/KeyStores.scala PRE-CREATION core/src/main/scala/kafka/network/ssl/SSLAuth.scala PRE-CREATION core/src/main/scala/kafka/network/ssl/SSLChannelFactory.scala PRE-CREATION core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala PRE-CREATION core/src/main/scala/kafka/network/ssl/SSLSocketChannel.scala PRE-CREATION core/src/main/scala/kafka/network/ssl/store/JKSInitializer.scala PRE-CREATION core/src/main/scala/kafka/producer/ProducerConfig.scala 3cdf23dce3407f1770b9c6543e3a8ae8ab3ff255 core/src/main/scala/kafka/producer/SyncProducer.scala 42c950375098b51f45c79c6a4a99a36f387bf02b core/src/main/scala/kafka/producer/SyncProducerConfig.scala 69b2d0c11bb1412ce76d566f285333c806be301a core/src/main/scala/kafka/server/AbstractFetcherThread.scala 2e9532e820b5b5c63dfd55f5454b32866d084a37 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/KafkaConfig.scala 165c816a9f4c925f6e46560e7e2ff9cf7591946b core/src/main/scala/kafka/server/KafkaHealthcheck.scala 4acdd70fe9c1ee78d6510741006c2ece65450671 core/src/main/scala/kafka/server/KafkaServer.scala 3e9e91f2b456bbdeb3055d571e18ffea8675b4bf core/src/main/scala/kafka/server/MetadataCache.scala bf81a1ab88c14be8697b441eedbeb28fa0112643 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 6879e730282185bda3d6bc3659cb15af0672cecf core/src/main/scala/kafka/tools/ConsoleConsumer.scala 323fc8566d974acc4e5c7d7c2a065794f3b5df4a core/src/main/scala/kafka/tools/ConsoleProducer.scala 8e9ba0b284671989f87d9c421bc98f5c4384c260 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala d1e7c434e77859d746b8dc68dd5d5a3740425e79 core/src/main/scala/kafka/tools/GetOffsetShell.scala 3d9293e4abbe3f4a4a2bc5833385747c604d5a95
[jira] [Updated] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Lyutov updated KAFKA-1684: --- Attachment: KAFKA-1684.patch Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Ivan Lyutov Attachments: KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181176#comment-14181176 ] Ivan Lyutov commented on KAFKA-1684: Created reviewboard https://reviews.apache.org/r/27071/diff/ against branch apache/trunk Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Ivan Lyutov Attachments: KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Lyutov updated KAFKA-1684: --- Status: Patch Available (was: Open) Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Ivan Lyutov Attachments: KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181180#comment-14181180 ] Ivan Lyutov commented on KAFKA-1684: The high-level description is available in docs/SECURITY.md file Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Ivan Lyutov Attachments: KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1726) Wrong message format description
Oleg Golovin created KAFKA-1726: --- Summary: Wrong message format description Key: KAFKA-1726 URL: https://issues.apache.org/jira/browse/KAFKA-1726 Project: Kafka Issue Type: Bug Components: website Reporter: Oleg Golovin Here [in this page|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata#KafkaEnrichedMessageMetadata-CurrentMessageFormat] you describe current Kafka message format: {code} MessageAndOffset = MessageSize Offset Message MessageSize = int32 Offset = int64 Message = Crc MagicByte Attributes KeyLength Key ValueLength Value Crc = int32 MagicByte = int8 Attributes = int8 KeyLength = int32 Key = bytes ValueLength = int32 Value = bytes {code} In reality _offset_ goes before _messageSize_. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1727) Fix comment about message format
Muneyuki Noguchi created KAFKA-1727: --- Summary: Fix comment about message format Key: KAFKA-1727 URL: https://issues.apache.org/jira/browse/KAFKA-1727 Project: Kafka Issue Type: Bug Reporter: Muneyuki Noguchi Priority: Trivial The comment in Message.scala says the value of magic identifier is 2, bq. 2. 1 byte magic identifier to allow format changes, value is 2 currently but the actual current magic value is 0. {code} /** * The current magic value */ val CurrentMagicValue: Byte = 0 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1727) Fix comment about message format
[ https://issues.apache.org/jira/browse/KAFKA-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181308#comment-14181308 ] Muneyuki Noguchi commented on KAFKA-1727: - Created reviewboard https://reviews.apache.org/r/27075/diff/ against branch origin/trunk Fix comment about message format Key: KAFKA-1727 URL: https://issues.apache.org/jira/browse/KAFKA-1727 Project: Kafka Issue Type: Bug Reporter: Muneyuki Noguchi Priority: Trivial Attachments: KAFKA-1727.patch The comment in Message.scala says the value of magic identifier is 2, bq. 2. 1 byte magic identifier to allow format changes, value is 2 currently but the actual current magic value is 0. {code} /** * The current magic value */ val CurrentMagicValue: Byte = 0 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 27075: Patch for KAFKA-1727
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27075/ --- Review request for kafka. Bugs: KAFKA-1727 https://issues.apache.org/jira/browse/KAFKA-1727 Repository: kafka Description --- The value of magic identifier is 0 because CurrentMagicValue is 0. Diffs - core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 Diff: https://reviews.apache.org/r/27075/diff/ Testing --- Thanks, Muneyuki Noguchi
[jira] [Updated] (KAFKA-1727) Fix comment about message format
[ https://issues.apache.org/jira/browse/KAFKA-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muneyuki Noguchi updated KAFKA-1727: Attachment: KAFKA-1727.patch Fix comment about message format Key: KAFKA-1727 URL: https://issues.apache.org/jira/browse/KAFKA-1727 Project: Kafka Issue Type: Bug Reporter: Muneyuki Noguchi Priority: Trivial Attachments: KAFKA-1727.patch The comment in Message.scala says the value of magic identifier is 2, bq. 2. 1 byte magic identifier to allow format changes, value is 2 currently but the actual current magic value is 0. {code} /** * The current magic value */ val CurrentMagicValue: Byte = 0 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1683) Implement a session concept in the socket server
[ https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181323#comment-14181323 ] Joe Stein commented on KAFKA-1683: -- [~gwenshap] We need to be careful with auth once only constant session. Revoking credentials would not be picked up until a new session was created. So we need some balance between always re-auth and session for scale. All of this down to deploy requirements many/most should be ok I think with the re-auth only once every X timeframe or always (levers/knobs). Or some other part of the code/system that is constantly checking auth changes and notifying the server of that change expiring the session then (making it async in nature). I really like it to be in the KafkaApi that is where all the good stuff is and this should mixed in cleanly with that IMHO. Implement a session concept in the socket server -- Key: KAFKA-1683 URL: https://issues.apache.org/jira/browse/KAFKA-1683 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Gwen Shapira To implement authentication we need a way to keep track of some things between requests. The initial use for this would be remembering the authenticated user/principle info, but likely more uses would come up (for example we will also need to remember whether and which encryption or integrity measures are in place on the socket so we can wrap and unwrap writes and reads). I was thinking we could just add a Session object that might have a user field. The session object would need to get added to RequestChannel.Request so it is passed down to the API layer with each request. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181348#comment-14181348 ] Evan Huus commented on KAFKA-1718: -- [~junrao] I already have wiki permissions, so I made the relevant change. While I'm in the neighbourhood, what is the expected value of the {{MagicByte}} field? The spec doesn't clarify, and my library has been leaving it at 0 without problems thus far, but [~sriharsha] mentioned earlier that the value should be 2? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Documentation with patches
I like how we have things in SVN. My issue is having two patches from contributors (one for tests + code and another for docs) that I am trying to solve. If we copy the entire SVN docs directory into git under /docs then contributions can patch the docs in their git patch. Committers can do 1 commit. When we release we just cp -r docs/* /svn/ svn add * svn co release //or such. The only trick is that we have to make sure for live website fixes that we commit in two places (but only then instead of every time). I don't mind doing something more fancy and generate the docs from some markdown but I am not sure it is necessary... we have a lot to get done in the next few months with 0.9 and I don't want to add anything unnecessary to that effort. I do think though with all the changes coming we want code contributors to keep the docs up to date and have doc changes + code + test all in one git patch would be best for everyone (however we accomplish that) for reviewing and such. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote: Currently we are handling the versioning problem by explicitly versioning docs that change over time (configuration, quickstart, design, etc). This is done by just creating a copy of these pages for each release in a subdirectory. So we can commit documentation changes at any time for the future release we just don't link up that release until it is out (theoretically you could get there by guessing the url, but that is okay). Although having multiple copies of certain pages, one for each release, seems odd, I think it is actually better because in practice we often end up editing old releases when we find problems in the older docs. -Jay On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho jar...@apache.org wrote: I would strongly support this idea. We have similar model in all other projects where I’m involved: The docs are part of the usual code base and we do require contributors to update them when they are adding a new feature. And then during release time we simply take snapshot of the docs and upload them to our public webpages. This enables us to have simple versioned docs on the website, so that users can easily find docs for their version and also the public site do not contain docs of unreleased features :) There is a lot of ways how to achieve that - in Sqoop 1 we used asciidoc to build the site, in Sqoop 2/Flume we’re using sphinx, Oozie is using markdown wiki... Jarcec On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, I'd love to encourage documentation contributions. I think we do have a way to contribute to docs. The current workflow for contributing is 1. Checkout the docs 2. Change docs 3. Submit patch in normal way 4. Committer reviews and applies For committers we have traditionally made the review step optional for docs. In reality this skips step 1.5 which is fiddling with apache for an hour to figure out how to get it to make apache includes work so you can see the docs. I actually think this is the bigger barrier to doc changes. One thing we could do is move the docs to one of the static site generators to do the includes (e.g. Jekyll) this might make setup slightly easier (although then you need to install Jekyll...). -Jay On Wed, Oct 22, 2014 at 9:55 AM, Joe Stein joe.st...@stealth.ly wrote: This comes up a lot but in reality not enough. We don't have a great way for folks to modify the code and change (or add) to the documentation. I think the documentation is awesome and as we grow the code contributors that should continue with them too. One thought I had that would work is that we copy the SVN files into a /docs folder in git. We can then take patches in git and then apply them to SVN when appropriate (like during a release or for immediate fixes). This way code changes in that patch can have documentation changes. The committers can manage what is changed where as appropriate either prior to a release or live updates to the website. Yes/No? Thanks! /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
[jira] [Commented] (KAFKA-1726) Wrong message format description
[ https://issues.apache.org/jira/browse/KAFKA-1726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181459#comment-14181459 ] Guozhang Wang commented on KAFKA-1726: -- Thanks Oleg for pointing this out, I just fixed it. Wrong message format description Key: KAFKA-1726 URL: https://issues.apache.org/jira/browse/KAFKA-1726 Project: Kafka Issue Type: Bug Components: website Reporter: Oleg Golovin Here [in this page|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata#KafkaEnrichedMessageMetadata-CurrentMessageFormat] you describe current Kafka message format: {code} MessageAndOffset = MessageSize Offset Message MessageSize = int32 Offset = int64 Message = Crc MagicByte Attributes KeyLength Key ValueLength Value Crc = int32 MagicByte = int8 Attributes = int8 KeyLength = int32 Key = bytes ValueLength = int32 Value = bytes {code} In reality _offset_ goes before _messageSize_. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (KAFKA-1726) Wrong message format description
[ https://issues.apache.org/jira/browse/KAFKA-1726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang closed KAFKA-1726. Assignee: Guozhang Wang Wrong message format description Key: KAFKA-1726 URL: https://issues.apache.org/jira/browse/KAFKA-1726 Project: Kafka Issue Type: Bug Components: website Reporter: Oleg Golovin Assignee: Guozhang Wang Here [in this page|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata#KafkaEnrichedMessageMetadata-CurrentMessageFormat] you describe current Kafka message format: {code} MessageAndOffset = MessageSize Offset Message MessageSize = int32 Offset = int64 Message = Crc MagicByte Attributes KeyLength Key ValueLength Value Crc = int32 MagicByte = int8 Attributes = int8 KeyLength = int32 Key = bytes ValueLength = int32 Value = bytes {code} In reality _offset_ goes before _messageSize_. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1726) Wrong message format description
[ https://issues.apache.org/jira/browse/KAFKA-1726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-1726. -- Resolution: Fixed Wrong message format description Key: KAFKA-1726 URL: https://issues.apache.org/jira/browse/KAFKA-1726 Project: Kafka Issue Type: Bug Components: website Reporter: Oleg Golovin Here [in this page|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata#KafkaEnrichedMessageMetadata-CurrentMessageFormat] you describe current Kafka message format: {code} MessageAndOffset = MessageSize Offset Message MessageSize = int32 Offset = int64 Message = Crc MagicByte Attributes KeyLength Key ValueLength Value Crc = int32 MagicByte = int8 Attributes = int8 KeyLength = int32 Key = bytes ValueLength = int32 Value = bytes {code} In reality _offset_ goes before _messageSize_. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181470#comment-14181470 ] Jun Rao commented on KAFKA-1718: Evan, Thanks for updating the wiki. The MagicByte currently is expected to be only 0. We don't validate it on the broker at the moment though. However, we will be if we evolve the message format in the future. Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] 0.8.2-beta Release Candidate 1
Joe, I downloaded the source distro and ran into this error. Followed the steps listed on the release process wiki to validate the release. vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle FAILURE: Build failed with an exception. * Where: Script '/home/vagrant/kafka-0.8.2-beta-src/scala.gradle' line: 2 * What went wrong: A problem occurred evaluating script. Cause: Could not find property 'ext' on settings 'kafka-0.8.2-beta-src'. * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. BUILD FAILED Total time: 1.565 secs vagrant@precise64:~/kafka-0.8.2-beta-src$ cat scala.gradle if (!hasProperty('scalaVersion')) { ext.scalaVersion = '2.10.1' } ext.defaultScalaVersion = '2.10.1' if (scalaVersion.startsWith('2.10')) { ext.baseScalaVersion = '2.10' } else if (scalaVersion.startsWith('2.11')) { ext.baseScalaVersion = '2.11' } else { ext.baseScalaVersion = scalaVersion } On Tue, Oct 21, 2014 at 1:58 PM, Joe Stein joe.st...@stealth.ly wrote: This is the first candidate for release of Apache Kafka 0.8.2-beta Release Notes for the 0.8.2-beta release https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Friday, October 24th, 2pm PT Kafka's KEYS file containing PGP keys we use to sign the release: https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/ * java-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
Re: [VOTE] 0.8.2-beta Release Candidate 1
hmmm. I just launched a bootstrap clean room (per release docs). Installed latest gradle sudo add-apt-repository ppa:cwchien/gradle sudo apt-get update sudo apt-get install gradle wget https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/kafka-0.8.2-beta-src.tgz tar -xvf kafka-0.8.2-beta-src.tgz cd kafka-0.8.2-beta-src/ gradle ... Building project 'core' with Scala version 2.10.1 :downloadWrapper BUILD SUCCESSFUL Total time: 22.066 secs so it works, I use the source upload to build the binaries. odd. * Maybe the gradle version you have is too old or something? I am using vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle -version Gradle 2.1 Build time: 2014-09-08 10:40:39 UTC Build number: none Revision: e6cf70745ac11fa943e19294d19a2c527a669a53 Groovy: 2.3.6 Ant: Apache Ant(TM) version 1.9.3 compiled on December 23 2013 JVM: 1.6.0_45 (Sun Microsystems Inc. 20.45-b01) OS: Linux 3.2.0-23-generic amd64 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Thu, Oct 23, 2014 at 12:49 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Joe, I downloaded the source distro and ran into this error. Followed the steps listed on the release process wiki to validate the release. vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle FAILURE: Build failed with an exception. * Where: Script '/home/vagrant/kafka-0.8.2-beta-src/scala.gradle' line: 2 * What went wrong: A problem occurred evaluating script. Cause: Could not find property 'ext' on settings 'kafka-0.8.2-beta-src'. * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. BUILD FAILED Total time: 1.565 secs vagrant@precise64:~/kafka-0.8.2-beta-src$ cat scala.gradle if (!hasProperty('scalaVersion')) { ext.scalaVersion = '2.10.1' } ext.defaultScalaVersion = '2.10.1' if (scalaVersion.startsWith('2.10')) { ext.baseScalaVersion = '2.10' } else if (scalaVersion.startsWith('2.11')) { ext.baseScalaVersion = '2.11' } else { ext.baseScalaVersion = scalaVersion } On Tue, Oct 21, 2014 at 1:58 PM, Joe Stein joe.st...@stealth.ly wrote: This is the first candidate for release of Apache Kafka 0.8.2-beta Release Notes for the 0.8.2-beta release https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Friday, October 24th, 2pm PT Kafka's KEYS file containing PGP keys we use to sign the release: https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/ * java-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
[jira] [Commented] (KAFKA-1683) Implement a session concept in the socket server
[ https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181569#comment-14181569 ] Jay Kreps commented on KAFKA-1683: -- I had imagined the auth happening in the network layer, though I agree that may be somewhat mixing concerns and your approach sounds like it could potentially better. There are a few details to work out, though. In the Kerberos case this is straight-forward since the request is processed in the API layer anyway. But how will this work for TLS? I was imagining that TLS would use a separate acceptor in the network layer that after accepting a connection and completing the handshake would populate the appropriate session fields. However if the sessions are maintained in the API layer then somehow we will have to work out how the result of the authentication gets to the api layer. Another issue is how we remove sessions when the connection closes. We would need to garbage collect these as connections were closed, and, perhaps more importantly, we need to ensure that it isn't possible to trick your way into that hash table. I.e. if the notion of equality for SelectorKey is comparing the underlying file descriptor, and file descriptors are reused periodically there could be problems. The implementation I was something like this. Currently we attach the current request to each socket in the socket server. Instead of this we would attach some object that could hold the session, the current request, and any other future things we need. For TLS the user information would be populated after the handshake but before the acceptor hands the connection over to the processor thread. For Kerberos the session would be populated in KafkaApis.handleAuthenticate (or whatever we call it). The only advantage to this approach that I see is that the session lifetime is easily tied to the connection (which is what we want) and it will be easy to validate this. I was thinking of the session concept as being a somewhat generic feature of the network layer to maintain per-connection state so in that sense it isn't too terrible to have it be part of the networkapi layer contract, though I agree in general that the simpler we can keep the network stuff the better (it would be really easy for more and more business logic to creep in once we are doing auth stuff). Implement a session concept in the socket server -- Key: KAFKA-1683 URL: https://issues.apache.org/jira/browse/KAFKA-1683 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Gwen Shapira To implement authentication we need a way to keep track of some things between requests. The initial use for this would be remembering the authenticated user/principle info, but likely more uses would come up (for example we will also need to remember whether and which encryption or integrity measures are in place on the socket so we can wrap and unwrap writes and reads). I was thinking we could just add a Session object that might have a user field. The session object would need to get added to RequestChannel.Request so it is passed down to the API layer with each request. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1641) Log cleaner exits if last cleaned offset is lower than earliest offset
[ https://issues.apache.org/jira/browse/KAFKA-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181600#comment-14181600 ] Guozhang Wang commented on KAFKA-1641: -- [~jjkoshy] Could you take another look? Log cleaner exits if last cleaned offset is lower than earliest offset -- Key: KAFKA-1641 URL: https://issues.apache.org/jira/browse/KAFKA-1641 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Joel Koshy Assignee: Guozhang Wang Attachments: KAFKA-1641.patch, KAFKA-1641_2014-10-09_13:04:15.patch Encountered this recently: the log cleaner exited a while ago (I think because the topic had compressed messages). That issue was subsequently addressed by having the producer only send uncompressed. However, on a subsequent restart of the broker we see this: In this scenario I think it is reasonable to just emit a warning and have the cleaner round up its first dirty offset to the base offset of the first segment. {code} [kafka-server] [] [kafka-log-cleaner-thread-0], Error due to java.lang.IllegalArgumentException: requirement failed: Last clean offset is 54770438 but segment base offset is 382844024 for log testtopic-0. at scala.Predef$.require(Predef.scala:145) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:491) at kafka.log.Cleaner.clean(LogCleaner.scala:288) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:202) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:187) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Documentation with patches
That makes sense. I agree that moving to another doc system isn't a high priority (it isn't as much work as it sounds because the HTML can all remain as is, just the includes would get converted). But actually I don't think that having a patch for docs and a patch for the code is too big a hurdle either. I think maybe we should just start asking for documentation patches and describe that in the contributing section--likely most people just don't think of it. -Jay On Thu, Oct 23, 2014 at 7:16 AM, Joe Stein joe.st...@stealth.ly wrote: I like how we have things in SVN. My issue is having two patches from contributors (one for tests + code and another for docs) that I am trying to solve. If we copy the entire SVN docs directory into git under /docs then contributions can patch the docs in their git patch. Committers can do 1 commit. When we release we just cp -r docs/* /svn/ svn add * svn co release //or such. The only trick is that we have to make sure for live website fixes that we commit in two places (but only then instead of every time). I don't mind doing something more fancy and generate the docs from some markdown but I am not sure it is necessary... we have a lot to get done in the next few months with 0.9 and I don't want to add anything unnecessary to that effort. I do think though with all the changes coming we want code contributors to keep the docs up to date and have doc changes + code + test all in one git patch would be best for everyone (however we accomplish that) for reviewing and such. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote: Currently we are handling the versioning problem by explicitly versioning docs that change over time (configuration, quickstart, design, etc). This is done by just creating a copy of these pages for each release in a subdirectory. So we can commit documentation changes at any time for the future release we just don't link up that release until it is out (theoretically you could get there by guessing the url, but that is okay). Although having multiple copies of certain pages, one for each release, seems odd, I think it is actually better because in practice we often end up editing old releases when we find problems in the older docs. -Jay On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho jar...@apache.org wrote: I would strongly support this idea. We have similar model in all other projects where I’m involved: The docs are part of the usual code base and we do require contributors to update them when they are adding a new feature. And then during release time we simply take snapshot of the docs and upload them to our public webpages. This enables us to have simple versioned docs on the website, so that users can easily find docs for their version and also the public site do not contain docs of unreleased features :) There is a lot of ways how to achieve that - in Sqoop 1 we used asciidoc to build the site, in Sqoop 2/Flume we’re using sphinx, Oozie is using markdown wiki... Jarcec On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, I'd love to encourage documentation contributions. I think we do have a way to contribute to docs. The current workflow for contributing is 1. Checkout the docs 2. Change docs 3. Submit patch in normal way 4. Committer reviews and applies For committers we have traditionally made the review step optional for docs. In reality this skips step 1.5 which is fiddling with apache for an hour to figure out how to get it to make apache includes work so you can see the docs. I actually think this is the bigger barrier to doc changes. One thing we could do is move the docs to one of the static site generators to do the includes (e.g. Jekyll) this might make setup slightly easier (although then you need to install Jekyll...). -Jay On Wed, Oct 22, 2014 at 9:55 AM, Joe Stein joe.st...@stealth.ly wrote: This comes up a lot but in reality not enough. We don't have a great way for folks to modify the code and change (or add) to the documentation. I think the documentation is awesome and as we grow the code contributors that should continue with them too. One thought I had that would work is that we copy the SVN files into a /docs folder in git. We can then take patches in git and then apply them to SVN when appropriate (like during a release or for immediate fixes). This way code changes in that patch can have documentation changes. The
Re: Documentation with patches
With two patches, we'll need a two-phase commit to maintain consistency, and we all know what a PITA this can be. (Sorry, couldn't resist the pun. Carry on.) On Thu, Oct 23, 2014 at 10:32 AM, Jay Kreps jay.kr...@gmail.com wrote: That makes sense. I agree that moving to another doc system isn't a high priority (it isn't as much work as it sounds because the HTML can all remain as is, just the includes would get converted). But actually I don't think that having a patch for docs and a patch for the code is too big a hurdle either. I think maybe we should just start asking for documentation patches and describe that in the contributing section--likely most people just don't think of it. -Jay On Thu, Oct 23, 2014 at 7:16 AM, Joe Stein joe.st...@stealth.ly wrote: I like how we have things in SVN. My issue is having two patches from contributors (one for tests + code and another for docs) that I am trying to solve. If we copy the entire SVN docs directory into git under /docs then contributions can patch the docs in their git patch. Committers can do 1 commit. When we release we just cp -r docs/* /svn/ svn add * svn co release //or such. The only trick is that we have to make sure for live website fixes that we commit in two places (but only then instead of every time). I don't mind doing something more fancy and generate the docs from some markdown but I am not sure it is necessary... we have a lot to get done in the next few months with 0.9 and I don't want to add anything unnecessary to that effort. I do think though with all the changes coming we want code contributors to keep the docs up to date and have doc changes + code + test all in one git patch would be best for everyone (however we accomplish that) for reviewing and such. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote: Currently we are handling the versioning problem by explicitly versioning docs that change over time (configuration, quickstart, design, etc). This is done by just creating a copy of these pages for each release in a subdirectory. So we can commit documentation changes at any time for the future release we just don't link up that release until it is out (theoretically you could get there by guessing the url, but that is okay). Although having multiple copies of certain pages, one for each release, seems odd, I think it is actually better because in practice we often end up editing old releases when we find problems in the older docs. -Jay On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho jar...@apache.org wrote: I would strongly support this idea. We have similar model in all other projects where I’m involved: The docs are part of the usual code base and we do require contributors to update them when they are adding a new feature. And then during release time we simply take snapshot of the docs and upload them to our public webpages. This enables us to have simple versioned docs on the website, so that users can easily find docs for their version and also the public site do not contain docs of unreleased features :) There is a lot of ways how to achieve that - in Sqoop 1 we used asciidoc to build the site, in Sqoop 2/Flume we’re using sphinx, Oozie is using markdown wiki... Jarcec On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, I'd love to encourage documentation contributions. I think we do have a way to contribute to docs. The current workflow for contributing is 1. Checkout the docs 2. Change docs 3. Submit patch in normal way 4. Committer reviews and applies For committers we have traditionally made the review step optional for docs. In reality this skips step 1.5 which is fiddling with apache for an hour to figure out how to get it to make apache includes work so you can see the docs. I actually think this is the bigger barrier to doc changes. One thing we could do is move the docs to one of the static site generators to do the includes (e.g. Jekyll) this might make setup slightly easier (although then you need to install Jekyll...). -Jay On Wed, Oct 22, 2014 at 9:55 AM, Joe Stein joe.st...@stealth.ly wrote: This comes up a lot but in reality not enough. We don't have a great way for folks to modify the code and change (or add) to the documentation. I think the documentation is awesome and as we grow the code contributors that should continue with them too. One thought I had that would work is that we copy the SVN files into a /docs
[jira] [Commented] (KAFKA-1683) Implement a session concept in the socket server
[ https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181696#comment-14181696 ] Gwen Shapira commented on KAFKA-1683: - Very good points [~jkreps] and [~joestein]. The way I read this - we should not rely on Session to maintain security. This is too high of a risk and very much re-inventing the wheel. We should use Session to maintain a generic user information, common to SASL, TLS and delegation tokens, that the API handlers know how to use. This will basically serve as the basis for the authorization layer. The way GSS + JAAS work (and I'm pretty sure we want to use these for Kerberos) - the client and server do a little authentication dance when the secured connection is first established, and the client is left with input and output streams that are secured. Everything sent and received on that socket afterwards will be encrypted with the server key and include the client session token. Decrypting the message and validating the client session token doesn't require communicating with Kerberos, so while there is certain overhead, it doesn't involve the network. If we use this token to populate our Session object with every request, we can be certain that the information is relevant to current connection and is unexpired (It will be the client responsibility to renew the token with KRB occasionally, this is per protocol). This will be easier to do if we use a separate port for Kerberos connections, they way we do for TLS, otherwise we'll need another way to figure out whether to perform the little song-and-dance when accepting the connection. But we can figure out this part later. Meanwhile, Session object, populated by SocketServer when creating the request, sent down to API layer. Since I like having the requestKey in the API, I'll add the new Session object as an addition, not a replacement. ... and that was a very roundabout way to go back to the original request, but I had to make sure this still makes sense to me :) Implement a session concept in the socket server -- Key: KAFKA-1683 URL: https://issues.apache.org/jira/browse/KAFKA-1683 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Gwen Shapira To implement authentication we need a way to keep track of some things between requests. The initial use for this would be remembering the authenticated user/principle info, but likely more uses would come up (for example we will also need to remember whether and which encryption or integrity measures are in place on the socket so we can wrap and unwrap writes and reads). I was thinking we could just add a Session object that might have a user field. The session object would need to get added to RequestChannel.Request so it is passed down to the API layer with each request. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26994: Patch for KAFKA-1719
On Oct. 22, 2014, 9:32 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 271 https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line271 Is there any value in setting this to true? It seems that just checking if it is false and exiting the process suffices. Setting to true something that is called cleanShutdown, when in fact, it isn't a clean shutdown is confusing to read. Also good to add a FATAL log entry as suggested by Guozhang as well. Guozhang Wang wrote: The boolean is used when the internal threads tries to exist, when it is not set, the threads knows it is closing abnormally and hence goes on to handle it. I agree its name is a bit misleading, and probably we can just name it as isShuttingDown. I'm thinking maybe we can use a separate flag in each thread to indicate whether it exits normally or not. So in the catch clause we set that flag to indicate the thread is exiting abnormally. That might be clearer. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review57907 --- On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 22, 2014, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181725#comment-14181725 ] Guozhang Wang commented on KAFKA-1501: -- I can re-produce the issue from time to time on my PC, and after digging around for a while I think this is the cause: http://www.serverframework.com/asynchronousevents/2011/01/time-wait-and-its-design-implications-for-protocols-and-scalable-servers.html I am attaching a simple patch here, [~copester] could you help double-checking if it fixes your issue? transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Labels: newbie Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181730#comment-14181730 ] Bhavesh Mistry commented on KAFKA-1721: --- [~ewencp], Thanks for fixing this issue. Snappy Dev has release new version with fix https://oss.sonatype.org/content/repositories/releases/org/xerial/snappy/snappy-java/1.1.1.4/ Thanks, Bhavesh Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1501: - Attachment: KAFKA-1501.patch transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1501: - Assignee: Guozhang Wang Status: Patch Available (was: Open) transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181734#comment-14181734 ] Guozhang Wang commented on KAFKA-1501: -- Created reviewboard https://reviews.apache.org/r/27101/diff/ against branch origin/trunk transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26994: Patch for KAFKA-1719
On Oct. 21, 2014, 10:21 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 323 https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line323 Is this change intended? Jiangjie Qin wrote: Yes, it is intended, so that we can make sure each data channel queue will receive a shutdown message. Otherwise 2 messages could go to the same data channel queue. Is this true? Each put will go to the next queue. i.e., Utils.abs(counter.getAndIncrement()) % numConsumers So suppose there are 10 producers and you put 10 shutdown messages. Each of those output queues will get exactly one shutdown message. I have a separate comment on the existing code wrt naming which I will put in the RB directly. - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review57680 --- On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 22, 2014, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 26994: Patch for KAFKA-1719
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review58049 --- core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment98952 Pre-existing naming issue that would be good to fix in this. At first glance, this is extremely confusing and looks wrong. DataChannel is defined as DataChannel(capacity, numProducers, numConsumers) It is instantiated on line 127 as new DataChannel(capacity, numConsumers, numProducers) i.e., it seems as though the arguments are switched. The consumers/producers parameters of the DataChannel are to be interpreted as inputs/outputs and have nothing to do with numConsumers/numProducers in the mirror maker. So, can you rename these fields in the DataChannel? i.e., numConsumers - numOutputs and numProducers - numInputs? - Joel Koshy On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 22, 2014, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
Review Request 27101: Fix KAFKA-1501 by enabling reuse address
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27101/ --- Review request for kafka. Bugs: KAFKA-1501 https://issues.apache.org/jira/browse/KAFKA-1501 Repository: kafka Description --- needs repeated unit tests to verify Diffs - core/src/main/scala/kafka/network/SocketServer.scala cee76b323e5f3e4c783749ac9e78e1ef02897e3b Diff: https://reviews.apache.org/r/27101/diff/ Testing --- Thanks, Guozhang Wang
Re: Documentation with patches
I actually think it is a pretty good idea to have the docs in git. Summarizing the benefits - 1. If a contributor/committer makes any significant changes to how a functionality works, they could always update the docs in parallel and reviewers can enforce this if they find the change deems a documentation change. The will help us to create a process around updating documentation which is hard today. 2. Creating a new version can be done when we cut a new branch. This seems a lot easier than remembering to update the documentation for a new version as an after thought. 3. Easy to review the docs with the code change in mind. We forget the exact changes over time and reviewers would need to review the code again to do a good review of the docs. +1 from me. On 10/23/14 10:37 AM, Gwen Shapira gshap...@cloudera.com wrote: With two patches, we'll need a two-phase commit to maintain consistency, and we all know what a PITA this can be. (Sorry, couldn't resist the pun. Carry on.) On Thu, Oct 23, 2014 at 10:32 AM, Jay Kreps jay.kr...@gmail.com wrote: That makes sense. I agree that moving to another doc system isn't a high priority (it isn't as much work as it sounds because the HTML can all remain as is, just the includes would get converted). But actually I don't think that having a patch for docs and a patch for the code is too big a hurdle either. I think maybe we should just start asking for documentation patches and describe that in the contributing section--likely most people just don't think of it. -Jay On Thu, Oct 23, 2014 at 7:16 AM, Joe Stein joe.st...@stealth.ly wrote: I like how we have things in SVN. My issue is having two patches from contributors (one for tests + code and another for docs) that I am trying to solve. If we copy the entire SVN docs directory into git under /docs then contributions can patch the docs in their git patch. Committers can do 1 commit. When we release we just cp -r docs/* /svn/ svn add * svn co release //or such. The only trick is that we have to make sure for live website fixes that we commit in two places (but only then instead of every time). I don't mind doing something more fancy and generate the docs from some markdown but I am not sure it is necessary... we have a lot to get done in the next few months with 0.9 and I don't want to add anything unnecessary to that effort. I do think though with all the changes coming we want code contributors to keep the docs up to date and have doc changes + code + test all in one git patch would be best for everyone (however we accomplish that) for reviewing and such. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote: Currently we are handling the versioning problem by explicitly versioning docs that change over time (configuration, quickstart, design, etc). This is done by just creating a copy of these pages for each release in a subdirectory. So we can commit documentation changes at any time for the future release we just don't link up that release until it is out (theoretically you could get there by guessing the url, but that is okay). Although having multiple copies of certain pages, one for each release, seems odd, I think it is actually better because in practice we often end up editing old releases when we find problems in the older docs. -Jay On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho jar...@apache.org wrote: I would strongly support this idea. We have similar model in all other projects where I¹m involved: The docs are part of the usual code base and we do require contributors to update them when they are adding a new feature. And then during release time we simply take snapshot of the docs and upload them to our public webpages. This enables us to have simple versioned docs on the website, so that users can easily find docs for their version and also the public site do not contain docs of unreleased features :) There is a lot of ways how to achieve that - in Sqoop 1 we used asciidoc to build the site, in Sqoop 2/Flume we¹re using sphinx, Oozie is using markdown wiki... Jarcec On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, I'd love to encourage documentation contributions. I think we do have a way to contribute to docs. The current workflow for contributing is 1. Checkout the docs 2. Change docs 3. Submit patch in normal way 4. Committer reviews and applies For committers we have traditionally made the review step optional for docs. In reality this skips step 1.5 which is fiddling with apache for an hour
Re: Review Request 26994: Patch for KAFKA-1719
On Oct. 22, 2014, 9:32 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 271 https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line271 Is there any value in setting this to true? It seems that just checking if it is false and exiting the process suffices. Setting to true something that is called cleanShutdown, when in fact, it isn't a clean shutdown is confusing to read. Also good to add a FATAL log entry as suggested by Guozhang as well. Guozhang Wang wrote: The boolean is used when the internal threads tries to exist, when it is not set, the threads knows it is closing abnormally and hence goes on to handle it. I agree its name is a bit misleading, and probably we can just name it as isShuttingDown. Jiangjie Qin wrote: I'm thinking maybe we can use a separate flag in each thread to indicate whether it exits normally or not. So in the catch clause we set that flag to indicate the thread is exiting abnormally. That might be clearer. I personally think the flag-per-thread is an overkill here: when each thread is about to exist (i.e. in the finally block), all it needs to check is if the whole MM is currently shutdown or not, if it is, then the thread itself knows it exist normally, otherwise log the FATAL and force killing the MM. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review57907 --- On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 22, 2014, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 26994: Patch for KAFKA-1719
On Oct. 21, 2014, 10:21 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 323 https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line323 Is this change intended? Jiangjie Qin wrote: Yes, it is intended, so that we can make sure each data channel queue will receive a shutdown message. Otherwise 2 messages could go to the same data channel queue. Joel Koshy wrote: Is this true? Each put will go to the next queue. i.e., Utils.abs(counter.getAndIncrement()) % numConsumers So suppose there are 10 producers and you put 10 shutdown messages. Each of those output queues will get exactly one shutdown message. I have a separate comment on the existing code wrt naming which I will put in the RB directly. Hi Joel, yes, I think you are right, since there will be no other put during the shutting down pahse, though it is kind of wiered that when we call shutdown for 1 thread and the other one got shutdown... - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review57680 --- On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 22, 2014, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
Re: Documentation with patches
+1 On Thu, Oct 23, 2014 at 11:45 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: I actually think it is a pretty good idea to have the docs in git. Summarizing the benefits - 1. If a contributor/committer makes any significant changes to how a functionality works, they could always update the docs in parallel and reviewers can enforce this if they find the change deems a documentation change. The will help us to create a process around updating documentation which is hard today. 2. Creating a new version can be done when we cut a new branch. This seems a lot easier than remembering to update the documentation for a new version as an after thought. 3. Easy to review the docs with the code change in mind. We forget the exact changes over time and reviewers would need to review the code again to do a good review of the docs. +1 from me. On 10/23/14 10:37 AM, Gwen Shapira gshap...@cloudera.com wrote: With two patches, we'll need a two-phase commit to maintain consistency, and we all know what a PITA this can be. (Sorry, couldn't resist the pun. Carry on.) On Thu, Oct 23, 2014 at 10:32 AM, Jay Kreps jay.kr...@gmail.com wrote: That makes sense. I agree that moving to another doc system isn't a high priority (it isn't as much work as it sounds because the HTML can all remain as is, just the includes would get converted). But actually I don't think that having a patch for docs and a patch for the code is too big a hurdle either. I think maybe we should just start asking for documentation patches and describe that in the contributing section--likely most people just don't think of it. -Jay On Thu, Oct 23, 2014 at 7:16 AM, Joe Stein joe.st...@stealth.ly wrote: I like how we have things in SVN. My issue is having two patches from contributors (one for tests + code and another for docs) that I am trying to solve. If we copy the entire SVN docs directory into git under /docs then contributions can patch the docs in their git patch. Committers can do 1 commit. When we release we just cp -r docs/* /svn/ svn add * svn co release //or such. The only trick is that we have to make sure for live website fixes that we commit in two places (but only then instead of every time). I don't mind doing something more fancy and generate the docs from some markdown but I am not sure it is necessary... we have a lot to get done in the next few months with 0.9 and I don't want to add anything unnecessary to that effort. I do think though with all the changes coming we want code contributors to keep the docs up to date and have doc changes + code + test all in one git patch would be best for everyone (however we accomplish that) for reviewing and such. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote: Currently we are handling the versioning problem by explicitly versioning docs that change over time (configuration, quickstart, design, etc). This is done by just creating a copy of these pages for each release in a subdirectory. So we can commit documentation changes at any time for the future release we just don't link up that release until it is out (theoretically you could get there by guessing the url, but that is okay). Although having multiple copies of certain pages, one for each release, seems odd, I think it is actually better because in practice we often end up editing old releases when we find problems in the older docs. -Jay On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho jar...@apache.org wrote: I would strongly support this idea. We have similar model in all other projects where I¹m involved: The docs are part of the usual code base and we do require contributors to update them when they are adding a new feature. And then during release time we simply take snapshot of the docs and upload them to our public webpages. This enables us to have simple versioned docs on the website, so that users can easily find docs for their version and also the public site do not contain docs of unreleased features :) There is a lot of ways how to achieve that - in Sqoop 1 we used asciidoc to build the site, in Sqoop 2/Flume we¹re using sphinx, Oozie is using markdown wiki... Jarcec On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, I'd love to encourage documentation contributions. I think we do have a way to contribute to docs. The current workflow for contributing is 1. Checkout the docs 2. Change docs 3. Submit patch in normal way 4. Committer reviews and applies For committers we have traditionally made the review
[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181795#comment-14181795 ] Jay Kreps commented on KAFKA-1501: -- Nice! transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181928#comment-14181928 ] Chris Cope commented on KAFKA-1501: --- Awesome! I should know shortly, spinning off 100 jobs via phone... transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181947#comment-14181947 ] Gwen Shapira commented on KAFKA-1684: - Its a large patch, so I didn't do a full review yet. But I wanted to mention that I love the ChannelFactory implementation. Very nice, clean and reusable. IMO, this should be a dependency for KAFKA-1686, so we can reuse a lot of the work done here. I think it will be cleaner and easier to simply reuse this code and have SASL/GSSAPI simply work as another channel than trying to cram SASL Kerberos on the same SocketServer as the existing connections (I've seen this done in HBase, there is a lot of (isSecure, isWrap, etc) involved). I'm using this implementation as reference to how Session object can integrate with a secured nio channel (KAFKA-1683 for reference). It looks like SSLSocketChannel has a local SSLEngine that I can access to get the client identity. For consistency, we may want all channels (SASL, TLS, Kafka) to expose a getIdentity method that SocketServer can use to populate the Session object. Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Ivan Lyutov Attachments: KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181997#comment-14181997 ] Gwen Shapira commented on KAFKA-1688: - The Wiki specifies: PermissionManager.isPermitted(Subject subject, InetAddress ip, Permissions permission, String resource) TLS doesn't seem to have a Subject, but both TLS and SASL have a Principal. I assume we can go with Principal instead. (Ran into this while trying to figure out what goes into Session. We can always change it later, but thought I'll make note) Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26373: Patch for KAFKA-1647
On Oct. 23, 2014, 12:42 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/ReplicaManager.scala, lines 531-538 https://reviews.apache.org/r/26373/diff/4/?file=728653#file728653line531 I am not sure that I understand how Option.flatten works. Would it be clearly if we first filter out partitions with no live leader and then generate the TopicAndPartition to BrokerAndInitialOffset map? This change actually should be reverted as Joel suggested because we no longer add the partition without leader to partitionsToMakeFollower any more, instead, we only create local replica for it. This section should be exactly same as the original code. I put flattern here to just remove the None option from a set... - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review57947 --- On Oct. 22, 2014, 6:08 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 22, 2014, 6:08 a.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. the version 2 code seems to be submitted by mistake... This should be the code for review that addressed Joel's comments. Addressed Jun's comments. Will do tests to verify if it works. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review57513 --- Thanks for the patch. Looks good to me. Some minor comments below. clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java https://reviews.apache.org/r/26885/#comment98242 It's probably better to put them in two lines. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java https://reviews.apache.org/r/26885/#comment98234 It seems that expired can just be timeLeftMs = 0? clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java https://reviews.apache.org/r/26885/#comment98236 Are these comments redundant now given the new comments above? clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java https://reviews.apache.org/r/26885/#comment98388 It seems that in this case, the nextReadyCheckDelayMs should be the remaining linger time for tp1, which is lingerMs/2. Should we just assert that? - Jun Rao On Oct. 21, 2014, 12:34 a.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 21, 2014, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1683) Implement a session concept in the socket server
[ https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182024#comment-14182024 ] Jay Kreps commented on KAFKA-1683: -- Here is what I was imagining, not sure if this is what you are proposing or not. It might be worth while to chat quickly to hash it out. 1. We add Session to the request passed from the socket server to the api layer. Session.authenticatedUser is the user that we will use in the authorization layer to check permissions. Session.securityCodec is the codec used to wrap and unwrap messages sent by the socket layer. If the securityCodec is null, no wrapping or unwrapping is done. We will need an SSL and SASL implementation of the security codec. The SSL version will wrap the SSLEngine instance and the SASL version will wrap the SaslServer instance, both of which provide a wrap and unwrap version. 2. Currently in the socketserver the only state we maintain per connection is the current request we are reading. This is attached to the channel. We would replace this with some variable which could hold both the in-progress request and the session. 3. We would add a new TlsAcceptor that would listen on the TLS port and handle the handshake for new TLS connections. When the handshake is complete the TlsAcceptor would grab the user information and populate that field in the session along with the securityCodec. 4. A similar thing would happen in KafkaApis.handleAuthenticate for Kerberos. This api would take the next step in the SASL challenge/response cycle and if it is complete it would populate the authenticatedUser and securityCodec fields in the session. 5. All authorization checks would be done using Session.authenticatedUser. 6. When the socket server is writing data it will always first run it through securityCodec.wrap() prior to sending it to make sure it is encrypted. When it is reading data it will read a complete size delimited message and then do securityCodec.unwrap() prior to handing that buffer off to the api layer. If securityCodec is null it will skip this step. A couple of assumptions here that I'm not sure were explicit: a. Gwen as you point out the notion of user/subject/principle isn't totally clear. Can this just be a string we extract rather than a java Principle/Subject? b. Is the protocol for encrypted or otherwise integrity checked packets a 4 byte plaintext size followed by an N byte packet? Or is the size also encrypted? If the later the unwrapping will have to happen in tandem with the reading which is slightly more complex but perhaps more clean. Implement a session concept in the socket server -- Key: KAFKA-1683 URL: https://issues.apache.org/jira/browse/KAFKA-1683 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Gwen Shapira To implement authentication we need a way to keep track of some things between requests. The initial use for this would be remembering the authenticated user/principle info, but likely more uses would come up (for example we will also need to remember whether and which encryption or integrity measures are in place on the socket so we can wrap and unwrap writes and reads). I was thinking we could just add a Session object that might have a user field. The session object would need to get added to RequestChannel.Request so it is passed down to the API layer with each request. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] 0.8.2-beta Release Candidate 1
Yeah, the version on Precise is really old, 1.0, and on Trusty is 1.4. Using the PPA version worked fine for me on Trusty. -Ewen On Thu, Oct 23, 2014, at 10:02 AM, Joe Stein wrote: hmmm. I just launched a bootstrap clean room (per release docs). Installed latest gradle sudo add-apt-repository ppa:cwchien/gradle sudo apt-get update sudo apt-get install gradle wget https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/kafka-0.8.2-beta-src.tgz tar -xvf kafka-0.8.2-beta-src.tgz cd kafka-0.8.2-beta-src/ gradle ... Building project 'core' with Scala version 2.10.1 :downloadWrapper BUILD SUCCESSFUL Total time: 22.066 secs so it works, I use the source upload to build the binaries. odd. * Maybe the gradle version you have is too old or something? I am using vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle -version Gradle 2.1 Build time: 2014-09-08 10:40:39 UTC Build number: none Revision: e6cf70745ac11fa943e19294d19a2c527a669a53 Groovy: 2.3.6 Ant: Apache Ant(TM) version 1.9.3 compiled on December 23 2013 JVM: 1.6.0_45 (Sun Microsystems Inc. 20.45-b01) OS: Linux 3.2.0-23-generic amd64 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Thu, Oct 23, 2014 at 12:49 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Joe, I downloaded the source distro and ran into this error. Followed the steps listed on the release process wiki to validate the release. vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle FAILURE: Build failed with an exception. * Where: Script '/home/vagrant/kafka-0.8.2-beta-src/scala.gradle' line: 2 * What went wrong: A problem occurred evaluating script. Cause: Could not find property 'ext' on settings 'kafka-0.8.2-beta-src'. * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. BUILD FAILED Total time: 1.565 secs vagrant@precise64:~/kafka-0.8.2-beta-src$ cat scala.gradle if (!hasProperty('scalaVersion')) { ext.scalaVersion = '2.10.1' } ext.defaultScalaVersion = '2.10.1' if (scalaVersion.startsWith('2.10')) { ext.baseScalaVersion = '2.10' } else if (scalaVersion.startsWith('2.11')) { ext.baseScalaVersion = '2.11' } else { ext.baseScalaVersion = scalaVersion } On Tue, Oct 21, 2014 at 1:58 PM, Joe Stein joe.st...@stealth.ly wrote: This is the first candidate for release of Apache Kafka 0.8.2-beta Release Notes for the 0.8.2-beta release https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Friday, October 24th, 2pm PT Kafka's KEYS file containing PGP keys we use to sign the release: https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/ * java-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
[jira] [Resolved] (KAFKA-1727) Fix comment about message format
[ https://issues.apache.org/jira/browse/KAFKA-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1727. Resolution: Fixed Fix Version/s: 0.8.3 Assignee: Muneyuki Noguchi Thanks for the patch. +1 and committed to trunk. Fix comment about message format Key: KAFKA-1727 URL: https://issues.apache.org/jira/browse/KAFKA-1727 Project: Kafka Issue Type: Bug Reporter: Muneyuki Noguchi Assignee: Muneyuki Noguchi Priority: Trivial Fix For: 0.8.3 Attachments: KAFKA-1727.patch The comment in Message.scala says the value of magic identifier is 2, bq. 2. 1 byte magic identifier to allow format changes, value is 2 currently but the actual current magic value is 0. {code} /** * The current magic value */ val CurrentMagicValue: Byte = 0 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps
[ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182057#comment-14182057 ] Jun Rao commented on KAFKA-391: --- The client should always receive one response per request. Are you see otherwise? Producer request and response classes should use maps - Key: KAFKA-391 URL: https://issues.apache.org/jira/browse/KAFKA-391 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Joel Koshy Priority: Blocker Labels: optimization Fix For: 0.8.0 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, KAFKA-391-v3.patch, KAFKA-391-v4.patch Producer response contains two arrays of error codes and offsets - the ordering in these arrays correspond to the flattened ordering of the request arrays. It would be better to switch to maps in the request and response as this would make the code clearer and more efficient (right now, linear scans are used in handling producer acks). We can probably do the same in the fetch request/response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182070#comment-14182070 ] Jun Rao commented on KAFKA-1716: Chris, Did you change fetch.wait.max.ms in the consumer config? Thanks, hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) at
Re: Review Request 26390: Fix KAFKA-1641
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26390/#review58117 --- Ship it! - Joel Koshy On Oct. 9, 2014, 8:04 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26390/ --- (Updated Oct. 9, 2014, 8:04 p.m.) Review request for kafka. Bugs: KAFKA-1641 https://issues.apache.org/jira/browse/KAFKA-1641 Repository: kafka Description --- Reset cleaning start offset upon abnormal log truncation Diffs - core/src/main/scala/kafka/log/LogCleanerManager.scala e8ced6a5922508ea3274905be7c3d6e728f320ac Diff: https://reviews.apache.org/r/26390/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 26390: Fix KAFKA-1641
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26390/#review58144 --- core/src/main/scala/kafka/log/LogCleanerManager.scala https://reviews.apache.org/r/26390/#comment99062 Actually, because of the + the format specifiers are unmatched. I will fix this on check-in. - Joel Koshy On Oct. 9, 2014, 8:04 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26390/ --- (Updated Oct. 9, 2014, 8:04 p.m.) Review request for kafka. Bugs: KAFKA-1641 https://issues.apache.org/jira/browse/KAFKA-1641 Repository: kafka Description --- Reset cleaning start offset upon abnormal log truncation Diffs - core/src/main/scala/kafka/log/LogCleanerManager.scala e8ced6a5922508ea3274905be7c3d6e728f320ac Diff: https://reviews.apache.org/r/26390/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1695) Authenticate connection to Zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-1695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182090#comment-14182090 ] Jun Rao commented on KAFKA-1695: Option 2 is probably too big a change. Let's see if we can have the change incorporated to ZKClient first. I can ping Stefan too if needed. Authenticate connection to Zookeeper Key: KAFKA-1695 URL: https://issues.apache.org/jira/browse/KAFKA-1695 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Gwen Shapira We need to make it possible to secure the Zookeeper cluster Kafka is using. This would make use of the normal authentication ZooKeeper provides. ZooKeeper supports a variety of authentication mechanisms so we will need to figure out what has to be passed in to the zookeeper client. The intention is that when the current round of client work is done it should be possible to run without clients needing access to Zookeeper so all we need here is to make it so that only the Kafka cluster is able to read and write to the Kafka znodes (we shouldn't need to set any kind of acl on a per-znode basis). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1641) Log cleaner exits if last cleaned offset is lower than earliest offset
[ https://issues.apache.org/jira/browse/KAFKA-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1641: -- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch. Pushed to trunk (after fixing the log message issue I noted on the RB) Log cleaner exits if last cleaned offset is lower than earliest offset -- Key: KAFKA-1641 URL: https://issues.apache.org/jira/browse/KAFKA-1641 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Joel Koshy Assignee: Guozhang Wang Attachments: KAFKA-1641.patch, KAFKA-1641_2014-10-09_13:04:15.patch Encountered this recently: the log cleaner exited a while ago (I think because the topic had compressed messages). That issue was subsequently addressed by having the producer only send uncompressed. However, on a subsequent restart of the broker we see this: In this scenario I think it is reasonable to just emit a warning and have the cleaner round up its first dirty offset to the base offset of the first segment. {code} [kafka-server] [] [kafka-log-cleaner-thread-0], Error due to java.lang.IllegalArgumentException: requirement failed: Last clean offset is 54770438 but segment base offset is 382844024 for log testtopic-0. at scala.Predef$.require(Predef.scala:145) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:491) at kafka.log.Cleaner.clean(LogCleaner.scala:288) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:202) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:187) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps
[ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182097#comment-14182097 ] Honghai Chen commented on KAFKA-391: Yes, after add more information to the error message , sometimes see two response for one request. Producer request and response classes should use maps - Key: KAFKA-391 URL: https://issues.apache.org/jira/browse/KAFKA-391 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Joel Koshy Priority: Blocker Labels: optimization Fix For: 0.8.0 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, KAFKA-391-v3.patch, KAFKA-391-v4.patch Producer response contains two arrays of error codes and offsets - the ordering in these arrays correspond to the flattened ordering of the request arrays. It would be better to switch to maps in the request and response as this would make the code clearer and more efficient (right now, linear scans are used in handling producer acks). We can probably do the same in the fetch request/response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2) a restful producer API
[ https://issues.apache.org/jira/browse/KAFKA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182111#comment-14182111 ] Anand Iyer commented on KAFKA-2: Since the Kafka APIs are undergoing changes in upcoming versions, this JIRA becomes more important. Kafka customers are asking for it. This is important for increased adoption of Kafka. a restful producer API -- Key: KAFKA-2 URL: https://issues.apache.org/jira/browse/KAFKA-2 Project: Kafka Issue Type: Improvement Priority: Minor If Kafka server supports a restful producer API, we can use Kafka in any programming language without implementing the wire protocol in each language. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182117#comment-14182117 ] Jun Rao commented on KAFKA-1471: Ewen, It seems that the necessary changes are already included in kafka-1493. Could we just resolve this jira? Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471_2014-10-12_16:02:19.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1695) Authenticate connection to Zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-1695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182123#comment-14182123 ] Gwen Shapira commented on KAFKA-1695: - Agree! I do not want to change everything that uses ZKClient. If you contact Stefan to see if we can get the ACL patch in, that will be fantastic. I'm willing to help with the pull request if needed. Authenticate connection to Zookeeper Key: KAFKA-1695 URL: https://issues.apache.org/jira/browse/KAFKA-1695 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Gwen Shapira We need to make it possible to secure the Zookeeper cluster Kafka is using. This would make use of the normal authentication ZooKeeper provides. ZooKeeper supports a variety of authentication mechanisms so we will need to figure out what has to be passed in to the zookeeper client. The intention is that when the current round of client work is done it should be possible to run without clients needing access to Zookeeper so all we need here is to make it so that only the Kafka cluster is able to read and write to the Kafka znodes (we shouldn't need to set any kind of acl on a per-znode basis). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182126#comment-14182126 ] Ewen Cheslack-Postava commented on KAFKA-1471: -- This was already applied, just never marked resolved. Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.2, 0.8.3 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471_2014-10-12_16:02:19.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1471: - Resolution: Fixed Fix Version/s: 0.8.2 Status: Resolved (was: Patch Available) Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.2, 0.8.3 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471_2014-10-12_16:02:19.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182127#comment-14182127 ] Jun Rao commented on KAFKA-1721: Bhavesh, Do you want to submit a patch to upgrade the snappy jar in Kafka? Thanks, Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182130#comment-14182130 ] Ewen Cheslack-Postava commented on KAFKA-1721: -- I have the trivial patch, but the upstream jar seems to be broken (see the earlier Github issue). I'll follow up on this once that issue is resolved. Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1675) bootstrapping tidy-up
[ https://issues.apache.org/jira/browse/KAFKA-1675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182138#comment-14182138 ] Jun Rao commented on KAFKA-1675: Thanks for the patch. Looks good. It doesn't seem to apply any more. Could you rebase? bootstrapping tidy-up - Key: KAFKA-1675 URL: https://issues.apache.org/jira/browse/KAFKA-1675 Project: Kafka Issue Type: Bug Reporter: Szczepan Faber Assignee: Ivan Lyutov Fix For: 0.8.3 Attachments: KAFKA-1675.patch I'd like to suggest following changes: 1. remove the 'gradlew' and 'gradlew.bat' scripts from the source tree. Those scripts don't work, e.g. they fail with exception when invoked. I just got a user report where those scripts were invoked by the user and it led to an exception that was not easy to grasp. Bootstrapping step will generate those files anyway. 2. move the 'gradleVersion' extra property from the 'build.gradle' into 'gradle.properties'. Otherwise it is hard to automate the bootstrapping process - in order to find out the gradle version, I need to evaluate the build script, and for that I need gradle with correct version (kind of a vicious circle). Project properties declared in the gradle.properties file can be accessed exactly the same as the 'ext' properties, for example: 'project.gradleVersion'. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1700) examples directory - README and shell scripts are out of date
[ https://issues.apache.org/jira/browse/KAFKA-1700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1700. Resolution: Fixed Assignee: Geoffrey Anderson Thanks for the patch. +1 and committed to trunk. examples directory - README and shell scripts are out of date - Key: KAFKA-1700 URL: https://issues.apache.org/jira/browse/KAFKA-1700 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Geoffrey Anderson Assignee: Geoffrey Anderson Priority: Minor Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1700.patch sbt build files were removed during resolution of KAFKA-1254, so the README under the examples directory should no longer make reference to sbt. Also, the paths added to CLASSPATH variable in the example shell script are no longer correct. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26885: Patch for KAFKA-1642
On Oct. 23, 2014, 9:43 p.m., Jun Rao wrote: clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java, lines 197-199 https://reviews.apache.org/r/26885/diff/2/?file=726776#file726776line197 It seems that in this case, the nextReadyCheckDelayMs should be the remaining linger time for tp1, which is lingerMs/2. Should we just assert that? tp1 and tp2 have the same leader, node1. The test adds enough data to make tp2 sendable, so in the ideal case only tp3 would be used to determine timeout, which should require lingerMs more time. However, the test checks for = lingerMs because single scan through the topic partitions means that we can incorporate the lingerMs/2 timeout from tp1 even though we determine later that we really want to ignore it (and I actually saw this happen when I initially wrote the test to check for the exact value). I think the tradeoff of sometimes waking up a bit earlier than needed is probably worthwhile since it keeps the implementation simpler and cheaper. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review57513 --- On Oct. 21, 2014, 12:34 a.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 21, 2014, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 23, 2014, 11:19 p.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description (updated) --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Addressing Jun's comments. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26994: Patch for KAFKA-1719
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 23, 2014, 11:20 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Neha and Guzhang's comments. Diffs (updated) - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1642: - Attachment: KAFKA-1642_2014-10-23_16:19:41.patch [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.
[ https://issues.apache.org/jira/browse/KAFKA-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1719: Attachment: KAFKA-1719_2014-10-23_16:20:22.patch Make mirror maker exit when one consumer/producer thread exits. --- Key: KAFKA-1719 URL: https://issues.apache.org/jira/browse/KAFKA-1719 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1719.patch, KAFKA-1719_2014-10-22_15:04:32.patch, KAFKA-1719_2014-10-23_16:20:22.patch When one of the consumer/producer thread exits, the entire mirror maker will be blocked. In this case, it is better to make it exit. It seems a single ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't need a list for the connectors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182165#comment-14182165 ] Ewen Cheslack-Postava commented on KAFKA-1642: -- Updated reviewboard https://reviews.apache.org/r/26885/diff/ against branch origin/trunk [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.
[ https://issues.apache.org/jira/browse/KAFKA-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182166#comment-14182166 ] Jiangjie Qin commented on KAFKA-1719: - Updated reviewboard https://reviews.apache.org/r/26994/diff/ against branch origin/trunk Make mirror maker exit when one consumer/producer thread exits. --- Key: KAFKA-1719 URL: https://issues.apache.org/jira/browse/KAFKA-1719 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1719.patch, KAFKA-1719_2014-10-22_15:04:32.patch, KAFKA-1719_2014-10-23_16:20:22.patch When one of the consumer/producer thread exits, the entire mirror maker will be blocked. In this case, it is better to make it exit. It seems a single ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't need a list for the connectors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26994: Patch for KAFKA-1719
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review58168 --- Ship it! LGTM, one minor thing upon check in. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment99094 fatal(Consumer thread failure due to , t) - Guozhang Wang On Oct. 23, 2014, 11:20 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 23, 2014, 11:20 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Neha and Guzhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps
[ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182197#comment-14182197 ] Jun Rao commented on KAFKA-391: --- Which version of Kafka are you using? Is that easily reproducible? Producer request and response classes should use maps - Key: KAFKA-391 URL: https://issues.apache.org/jira/browse/KAFKA-391 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Joel Koshy Priority: Blocker Labels: optimization Fix For: 0.8.0 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, KAFKA-391-v3.patch, KAFKA-391-v4.patch Producer response contains two arrays of error codes and offsets - the ordering in these arrays correspond to the flattened ordering of the request arrays. It would be better to switch to maps in the request and response as this would make the code clearer and more efficient (right now, linear scans are used in handling producer acks). We can probably do the same in the fetch request/response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182198#comment-14182198 ] Bhavesh Mistry commented on KAFKA-1710: --- [~jkreps], Sorry to bug you again. Did you get chance to review the above performance number and cost of Sync per thread when Partition is not set and partition set to single partition ? Thanks, Bhavesh [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26994: Patch for KAFKA-1719
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review58171 --- Ship it! Besides the minor stylistic comment, rest looks good. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment99103 Minor stylistic comment- if(!isShuttingdown.get()) core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment99104 ditto here - Neha Narkhede On Oct. 23, 2014, 11:20 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 23, 2014, 11:20 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Neha and Guzhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182203#comment-14182203 ] Chris Richardson commented on KAFKA-1716: - No. I am using the default. hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) at jrockit/vm/RNI.c2java(J)V(Native
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182219#comment-14182219 ] Jay Kreps commented on KAFKA-1710: -- Well so we need to do a very quick lock around the insert into the queue to maintain thread safety. With multiple threads competing on a single partition this will definitely be slower than if you have multiple partitions. But although it is slower it shouldn't be slow. The data you have gives 2666 bytes/us, isn't that pretty good? That is 2.6GB/second, no? [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: ConsumerFetcherThread deadlock?
Jack, The fetchers are blocked on the queue since it is full, is your consumer iterator stopped and hence not getting more data from it? Guozhang On Thu, Oct 23, 2014 at 3:53 PM, Jack Foy j...@whitepages.com wrote: Hi all, We run kafka 0.8.1.1. We’re tracking down a problem where consumer groups stop pulling from their respective partitions a few minutes or hours into execution. It looks like all ConsumerFetcherThreads associated with that consumer are blocking while waiting to write data to a LinkedBlockingQueue. They are waiting on ConditionObjects with different object IDs, and those object IDs do not occur elsewhere within our snapshot of thread data. It appears that those threads never make progress once they enter this waiting state. KAFKA-937 looks like a very similar symptom: https://issues.apache.org/jira/browse/KAFKA-937 According to Jun Rao’s comments on that issue, a ConsumerFetcherThread should never be blocked. Is that still the case? Here’s the thread dump for the relevant threads. I can provide more information if needed. ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-0 prio=10 tid=0x7f1954a9c800 nid=0xbf0 waiting on condition [0x7f19339f8000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x8ac24dd8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala.collection.immutable.Map$Map3.foreach(Map.scala:154) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-1 prio=10 tid=0x7f1955657000 nid=0xbf3 waiting on condition [0x7f19321e] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x8ad280e8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala.collection.immutable.Map$Map3.foreach(Map.scala:154) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-2 prio=10 tid=0x7f1954001000 nid=0xbf1 waiting on condition [0x7f19326e5000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method)
[jira] [Resolved] (KAFKA-1711) WARN Property topic is not valid when running console producer
[ https://issues.apache.org/jira/browse/KAFKA-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1711. Resolution: Fixed Fix Version/s: 0.8.3 Assignee: Joe Crobak Thanks for the patch. Committed to trunk. WARN Property topic is not valid when running console producer -- Key: KAFKA-1711 URL: https://issues.apache.org/jira/browse/KAFKA-1711 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Joe Crobak Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1711.patch bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2014-10-17 09:54:23,984] WARN Property topic is not valid (kafka.utils.VerifiableProperties) It would be good if we can get rid of the warning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: ConsumerFetcherThread deadlock?
This can also happen if at least one of the consumer threads (not the fetcher threads) die. You can inspect the thread dump to see if all your consumer threads are alive. On Thu, Oct 23, 2014 at 5:05 PM, Guozhang Wang wangg...@gmail.com wrote: Jack, The fetchers are blocked on the queue since it is full, is your consumer iterator stopped and hence not getting more data from it? Guozhang On Thu, Oct 23, 2014 at 3:53 PM, Jack Foy j...@whitepages.com wrote: Hi all, We run kafka 0.8.1.1. We’re tracking down a problem where consumer groups stop pulling from their respective partitions a few minutes or hours into execution. It looks like all ConsumerFetcherThreads associated with that consumer are blocking while waiting to write data to a LinkedBlockingQueue. They are waiting on ConditionObjects with different object IDs, and those object IDs do not occur elsewhere within our snapshot of thread data. It appears that those threads never make progress once they enter this waiting state. KAFKA-937 looks like a very similar symptom: https://issues.apache.org/jira/browse/KAFKA-937 According to Jun Rao’s comments on that issue, a ConsumerFetcherThread should never be blocked. Is that still the case? Here’s the thread dump for the relevant threads. I can provide more information if needed. ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-0 prio=10 tid=0x7f1954a9c800 nid=0xbf0 waiting on condition [0x7f19339f8000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x8ac24dd8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala.collection.immutable.Map$Map3.foreach(Map.scala:154) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-1 prio=10 tid=0x7f1955657000 nid=0xbf3 waiting on condition [0x7f19321e] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x8ad280e8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala.collection.immutable.Map$Map3.foreach(Map.scala:154) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at
Re: Bad link on the document
Sorry for the delay. Fixed the link. Thanks, Jun On Tue, Oct 14, 2014 at 8:18 PM, Azuryy Yu azury...@gmail.com wrote: Hi, Usage information on the hadoop consumer can be found here https://github.com/linkedin/camus/tree/camus-kafka-0.8/. the Link is broken, who can kindly fix it, thanks
[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182233#comment-14182233 ] Chris Cope commented on KAFKA-1501: --- Unfortunately same issues, 9/100 tests had a bunch of failures transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps
[ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182240#comment-14182240 ] Joel Koshy commented on KAFKA-391: -- You mean two responses to the caller of send? I don't see why those two lines would cause two responses. Can you explain further and provide steps to reproduce if there really is an issue? Do you see errors/warns in the logs? Producer request and response classes should use maps - Key: KAFKA-391 URL: https://issues.apache.org/jira/browse/KAFKA-391 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Joel Koshy Priority: Blocker Labels: optimization Fix For: 0.8.0 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, KAFKA-391-v3.patch, KAFKA-391-v4.patch Producer response contains two arrays of error codes and offsets - the ordering in these arrays correspond to the flattened ordering of the request arrays. It would be better to switch to maps in the request and response as this would make the code clearer and more efficient (right now, linear scans are used in handling producer acks). We can probably do the same in the fetch request/response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: ConsumerFetcherThread deadlock?
On Oct 23, 2014, at 5:09 PM, Neha Narkhede neha.narkh...@gmail.com wrote: This can also happen if at least one of the consumer threads (not the fetcher threads) die. You can inspect the thread dump to see if all your consumer threads are alive. Thanks very much — that’s probably what is happening, and we’ll investigate further. -- Jack Foy j...@whitepages.com
Re: Review Request 26994: Patch for KAFKA-1719
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review58185 --- Ship it! Looks good - can you upload an updated RB incorporating the minor comments? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment99129 exited - Joel Koshy On Oct. 23, 2014, 11:20 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 23, 2014, 11:20 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Neha and Guzhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182253#comment-14182253 ] Jun Rao commented on KAFKA-1667: The server side already has a dependency on clients. So, we can start using ConfigDef on the broker side as well. topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182258#comment-14182258 ] Jun Rao commented on KAFKA-1716: Hmm, then it's not clear why SimpleConsumer is blocked on reading from the socket. Could you turn on the request log on the broker and see if the fetch request just takes long or it never completes? hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at
Re: Review Request 27060: Patch for KAFKA-1725
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27060/#review58189 --- Ship it! Ship It! - Neha Narkhede On Oct. 22, 2014, 11:52 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27060/ --- (Updated Oct. 22, 2014, 11:52 p.m.) Review request for kafka. Bugs: KAFKA-1725 https://issues.apache.org/jira/browse/KAFKA-1725 Repository: kafka Description --- KAFKA-1725: Clean up system test output: fix typo in system test case file, incorrectly named system test configuration files, and skip trying to generate metrics graphs when no data is available. Diffs - system_test/mirror_maker_testsuite/testcase_15001/testcase_5001_properties.json system_test/mirror_maker_testsuite/testcase_15002/testcase_5002_properties.json system_test/mirror_maker_testsuite/testcase_15003/testcase_5003_properties.json system_test/mirror_maker_testsuite/testcase_15004/testcase_5004_properties.json system_test/mirror_maker_testsuite/testcase_15005/testcase_5005_properties.json system_test/mirror_maker_testsuite/testcase_15006/testcase_5006_properties.json system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json 308f1937bbdc0fdcebdb8e9bc39e643c3f0c18be system_test/replication_testsuite/testcase_10101/testcase_0101_properties.json system_test/replication_testsuite/testcase_10102/testcase_0102_properties.json system_test/replication_testsuite/testcase_10103/testcase_0103_properties.json system_test/replication_testsuite/testcase_10104/testcase_0104_properties.json system_test/replication_testsuite/testcase_10105/testcase_0105_properties.json system_test/replication_testsuite/testcase_10106/testcase_0106_properties.json system_test/replication_testsuite/testcase_10107/testcase_0107_properties.json system_test/replication_testsuite/testcase_10108/testcase_0108_properties.json system_test/replication_testsuite/testcase_10109/testcase_0109_properties.json system_test/replication_testsuite/testcase_10110/testcase_0110_properties.json system_test/replication_testsuite/testcase_10131/testcase_0131_properties.json system_test/replication_testsuite/testcase_10132/testcase_0132_properties.json system_test/replication_testsuite/testcase_10133/testcase_0133_properties.json system_test/replication_testsuite/testcase_10134/testcase_0134_properties.json system_test/utils/metrics.py d98d3cdeab00be9ddf4b7032a68da3886e4850c7 Diff: https://reviews.apache.org/r/27060/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps
[ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182276#comment-14182276 ] Joel Koshy commented on KAFKA-391: -- Yeah I see your point. That's interesting - I actually don't remember why that was added but it appears there must have been a legitimate reason (since you ran into it:) ). Since you are able to reproduce it can you actually print the full original request and response itself? It should be in the exception that is thrown. Also, what is your broker Kafka version? Also, what is the version of the producer? Is it the same? Producer request and response classes should use maps - Key: KAFKA-391 URL: https://issues.apache.org/jira/browse/KAFKA-391 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Joel Koshy Priority: Blocker Labels: optimization Fix For: 0.8.0 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, KAFKA-391-v3.patch, KAFKA-391-v4.patch Producer response contains two arrays of error codes and offsets - the ordering in these arrays correspond to the flattened ordering of the request arrays. It would be better to switch to maps in the request and response as this would make the code clearer and more efficient (right now, linear scans are used in handling producer acks). We can probably do the same in the fetch request/response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1721: - Attachment: KAFKA-1721.patch Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1721.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182280#comment-14182280 ] Ewen Cheslack-Postava commented on KAFKA-1721: -- Created reviewboard https://reviews.apache.org/r/27124/diff/ against branch origin/trunk Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1721.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1721: - Status: Patch Available (was: Open) Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1721.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 27124: Patch for KAFKA-1721
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27124/ --- Review request for kafka. Bugs: KAFKA-1721 https://issues.apache.org/jira/browse/KAFKA-1721 Repository: kafka Description --- KAFKA-1721 Bump snappy-java version for thread-safety fix. Diffs - build.gradle c3e6bb839ad65c512c9db4695d2bb49b82c80da5 Diff: https://reviews.apache.org/r/27124/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182295#comment-14182295 ] Chris Richardson commented on KAFKA-1716: - I just realized that I left out the thread state for the ConsumerFetcherThread: {quote} java.lang.Thread.State: RUNNABLE at sun.nio.ch.Net.poll(Native Method) at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:950) - locked 0x0006c2dadab8 (a java.lang.Object) at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204) - locked 0x0006c2dada88 (a java.lang.Object) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) - locked 0x0006c2dadbd0 (a sun.nio.ch.SocketAdaptor$SocketInputStream) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) - locked 0x0006c2dae460 (a java.lang.Object) {quote} All of the ConsumerFetcherThreads are in the RUNNABLE state yet as far as I can tell only a small percentage of my consumers ever shutdown. hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
Jenkins build is back to normal : Kafka-trunk #316
See https://builds.apache.org/job/Kafka-trunk/316/changes
Re: [VOTE] 0.8.2-beta Release Candidate 1
Joe, Verified quickstart on both the src and binary release. They all look good. The javadoc doesn't seem to include those in clients. Could you add them? Thanks, Jun On Tue, Oct 21, 2014 at 1:58 PM, Joe Stein joe.st...@stealth.ly wrote: This is the first candidate for release of Apache Kafka 0.8.2-beta Release Notes for the 0.8.2-beta release https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Friday, October 24th, 2pm PT Kafka's KEYS file containing PGP keys we use to sign the release: https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/ * java-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /