[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps

2014-10-23 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181170#comment-14181170
 ] 

Honghai Chen commented on KAFKA-391:


Why this fix add this line code?
Sometimes got 2 responses for one request,   Why fall into this situation, will 
there be duplicate data in Kafka?

https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=commitdiff;h=b688c3ba045df340bc32caa40ba1909eddbcbec5
 
+if (response.status.size != producerRequest.data.size)+  throw 
new KafkaException(Incomplete response (%s) for producer request (%s)+
   .format(response, producerRequest))



 Producer request and response classes should use maps
 -

 Key: KAFKA-391
 URL: https://issues.apache.org/jira/browse/KAFKA-391
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Joel Koshy
Priority: Blocker
  Labels: optimization
 Fix For: 0.8.0

 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, 
 KAFKA-391-v3.patch, KAFKA-391-v4.patch


 Producer response contains two arrays of error codes and offsets - the 
 ordering in these arrays correspond to the flattened ordering of the request 
 arrays.
 It would be better to switch to maps in the request and response as this 
 would make the code clearer and more efficient (right now, linear scans are 
 used in handling producer acks).
 We can probably do the same in the fetch request/response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 27071: Patch for KAFKA-1684

2014-10-23 Thread Ivan Lyutov

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27071/
---

Review request for kafka.


Bugs: KAFKA-1684
https://issues.apache.org/jira/browse/KAFKA-1684


Repository: kafka


Description
---

working


working


working


working


working


almost done


almost done


done


one more thing


one more thing


one more thing


Diffs
-

  config/client.keystore PRE-CREATION 
  config/client.public-key PRE-CREATION 
  config/client.ssl.properties PRE-CREATION 
  config/log4j.properties 95022543debedf26b5da58cc019de5d5b809f97f 
  config/server.keystore PRE-CREATION 
  config/server.properties b0e4496a8ca736b6abe965a430e8ce87b0e8287f 
  config/server.public-key PRE-CREATION 
  config/server.ssl.properties PRE-CREATION 
  contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java 
1d0e0a917985736bef7af66c741e5807d8503121 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/api/TopicMetadataForChannelRequest.scala 
PRE-CREATION 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
530982e36b17934b8cc5fb668075a5342e142c59 
  core/src/main/scala/kafka/client/ClientUtils.scala 
ebba87f0566684c796c26cb76c64b4640a5ccfde 
  core/src/main/scala/kafka/cluster/Broker.scala 
0060add008bb3bc4b0092f2173c469fce0120be6 
  core/src/main/scala/kafka/common/BrokerChannelNotAvailableException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
9ebbee6c16dc83767297c729d2d74ebbd063a993 
  core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b9e2bea7b442a19bcebd1b350d39541a8c9dd068 
  core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
f8c1b4e674f7515c377c6c30d212130f1ff022dd 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
d349a3000feb9ccd57d1f3cb163548d5bf432186 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
fbc680fde21b02f11285a4f4b442987356abd17b 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
  core/src/main/scala/kafka/controller/KafkaController.scala 
8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
3e87e1d36f87b7dd539a474609b1c95487a4c337 
  core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala 
0ab0195dc9f66f407061d0fac2549bd6291e41ff 
  core/src/main/scala/kafka/network/BlockingChannel.scala 
eb7bb14d94cb3648c06d4de36a3b34aacbde4556 
  core/src/main/scala/kafka/network/ChannelFactory.scala PRE-CREATION 
  core/src/main/scala/kafka/network/ChannelInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/network/ChannelType.scala PRE-CREATION 
  core/src/main/scala/kafka/network/KafkaChannel.scala PRE-CREATION 
  core/src/main/scala/kafka/network/PlainSocketChannelFactory.scala 
PRE-CREATION 
  core/src/main/scala/kafka/network/SocketServer.scala 
cee76b323e5f3e4c783749ac9e78e1ef02897e3b 
  core/src/main/scala/kafka/network/ssl/KeyStores.scala PRE-CREATION 
  core/src/main/scala/kafka/network/ssl/SSLAuth.scala PRE-CREATION 
  core/src/main/scala/kafka/network/ssl/SSLChannelFactory.scala PRE-CREATION 
  core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala PRE-CREATION 
  core/src/main/scala/kafka/network/ssl/SSLSocketChannel.scala PRE-CREATION 
  core/src/main/scala/kafka/network/ssl/store/JKSInitializer.scala PRE-CREATION 
  core/src/main/scala/kafka/producer/ProducerConfig.scala 
3cdf23dce3407f1770b9c6543e3a8ae8ab3ff255 
  core/src/main/scala/kafka/producer/SyncProducer.scala 
42c950375098b51f45c79c6a4a99a36f387bf02b 
  core/src/main/scala/kafka/producer/SyncProducerConfig.scala 
69b2d0c11bb1412ce76d566f285333c806be301a 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
2e9532e820b5b5c63dfd55f5454b32866d084a37 
  core/src/main/scala/kafka/server/KafkaApis.scala 
c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
165c816a9f4c925f6e46560e7e2ff9cf7591946b 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
4acdd70fe9c1ee78d6510741006c2ece65450671 
  core/src/main/scala/kafka/server/KafkaServer.scala 
3e9e91f2b456bbdeb3055d571e18ffea8675b4bf 
  core/src/main/scala/kafka/server/MetadataCache.scala 
bf81a1ab88c14be8697b441eedbeb28fa0112643 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
6879e730282185bda3d6bc3659cb15af0672cecf 
  core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
323fc8566d974acc4e5c7d7c2a065794f3b5df4a 
  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
8e9ba0b284671989f87d9c421bc98f5c4384c260 
  core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
d1e7c434e77859d746b8dc68dd5d5a3740425e79 
  core/src/main/scala/kafka/tools/GetOffsetShell.scala 
3d9293e4abbe3f4a4a2bc5833385747c604d5a95 
  

[jira] [Updated] (KAFKA-1684) Implement TLS/SSL authentication

2014-10-23 Thread Ivan Lyutov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Lyutov updated KAFKA-1684:
---
Attachment: KAFKA-1684.patch

 Implement TLS/SSL authentication
 

 Key: KAFKA-1684
 URL: https://issues.apache.org/jira/browse/KAFKA-1684
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Ivan Lyutov
 Attachments: KAFKA-1684.patch


 Add an SSL port to the configuration and advertise this as part of the 
 metadata request.
 If the SSL port is configured the socket server will need to add a second 
 Acceptor thread to listen on it. Connections accepted on this port will need 
 to go through the SSL handshake prior to being registered with a Processor 
 for request processing.
 SSL requests and responses may need to be wrapped or unwrapped using the 
 SSLEngine that was initialized by the acceptor. This wrapping and unwrapping 
 is very similar to what will need to be done for SASL-based authentication 
 schemes. We should have a uniform interface that covers both of these and we 
 will need to store the instance in the session with the request. The socket 
 server will have to use this object when reading and writing requests. We 
 will need to take care with the FetchRequests as the current 
 FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we 
 can only use this optimization for unencrypted sockets that don't require 
 userspace translation (wrapping).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication

2014-10-23 Thread Ivan Lyutov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181176#comment-14181176
 ] 

Ivan Lyutov commented on KAFKA-1684:


Created reviewboard https://reviews.apache.org/r/27071/diff/
 against branch apache/trunk

 Implement TLS/SSL authentication
 

 Key: KAFKA-1684
 URL: https://issues.apache.org/jira/browse/KAFKA-1684
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Ivan Lyutov
 Attachments: KAFKA-1684.patch


 Add an SSL port to the configuration and advertise this as part of the 
 metadata request.
 If the SSL port is configured the socket server will need to add a second 
 Acceptor thread to listen on it. Connections accepted on this port will need 
 to go through the SSL handshake prior to being registered with a Processor 
 for request processing.
 SSL requests and responses may need to be wrapped or unwrapped using the 
 SSLEngine that was initialized by the acceptor. This wrapping and unwrapping 
 is very similar to what will need to be done for SASL-based authentication 
 schemes. We should have a uniform interface that covers both of these and we 
 will need to store the instance in the session with the request. The socket 
 server will have to use this object when reading and writing requests. We 
 will need to take care with the FetchRequests as the current 
 FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we 
 can only use this optimization for unencrypted sockets that don't require 
 userspace translation (wrapping).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1684) Implement TLS/SSL authentication

2014-10-23 Thread Ivan Lyutov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Lyutov updated KAFKA-1684:
---
Status: Patch Available  (was: Open)

 Implement TLS/SSL authentication
 

 Key: KAFKA-1684
 URL: https://issues.apache.org/jira/browse/KAFKA-1684
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Ivan Lyutov
 Attachments: KAFKA-1684.patch


 Add an SSL port to the configuration and advertise this as part of the 
 metadata request.
 If the SSL port is configured the socket server will need to add a second 
 Acceptor thread to listen on it. Connections accepted on this port will need 
 to go through the SSL handshake prior to being registered with a Processor 
 for request processing.
 SSL requests and responses may need to be wrapped or unwrapped using the 
 SSLEngine that was initialized by the acceptor. This wrapping and unwrapping 
 is very similar to what will need to be done for SASL-based authentication 
 schemes. We should have a uniform interface that covers both of these and we 
 will need to store the instance in the session with the request. The socket 
 server will have to use this object when reading and writing requests. We 
 will need to take care with the FetchRequests as the current 
 FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we 
 can only use this optimization for unencrypted sockets that don't require 
 userspace translation (wrapping).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication

2014-10-23 Thread Ivan Lyutov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181180#comment-14181180
 ] 

Ivan Lyutov commented on KAFKA-1684:


The high-level description is available in docs/SECURITY.md file

 Implement TLS/SSL authentication
 

 Key: KAFKA-1684
 URL: https://issues.apache.org/jira/browse/KAFKA-1684
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Ivan Lyutov
 Attachments: KAFKA-1684.patch


 Add an SSL port to the configuration and advertise this as part of the 
 metadata request.
 If the SSL port is configured the socket server will need to add a second 
 Acceptor thread to listen on it. Connections accepted on this port will need 
 to go through the SSL handshake prior to being registered with a Processor 
 for request processing.
 SSL requests and responses may need to be wrapped or unwrapped using the 
 SSLEngine that was initialized by the acceptor. This wrapping and unwrapping 
 is very similar to what will need to be done for SASL-based authentication 
 schemes. We should have a uniform interface that covers both of these and we 
 will need to store the instance in the session with the request. The socket 
 server will have to use this object when reading and writing requests. We 
 will need to take care with the FetchRequests as the current 
 FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we 
 can only use this optimization for unencrypted sockets that don't require 
 userspace translation (wrapping).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1726) Wrong message format description

2014-10-23 Thread Oleg Golovin (JIRA)
Oleg Golovin created KAFKA-1726:
---

 Summary: Wrong message format description
 Key: KAFKA-1726
 URL: https://issues.apache.org/jira/browse/KAFKA-1726
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Oleg Golovin


Here [in this 
page|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata#KafkaEnrichedMessageMetadata-CurrentMessageFormat]
 you describe current Kafka message format:
{code}
MessageAndOffset = MessageSize Offset Message
  MessageSize = int32
  Offset = int64
 
  Message = Crc MagicByte Attributes KeyLength Key ValueLength Value
Crc = int32
MagicByte = int8
Attributes = int8
KeyLength = int32
Key = bytes
ValueLength = int32
Value = bytes
{code}

In reality _offset_ goes before _messageSize_.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1727) Fix comment about message format

2014-10-23 Thread Muneyuki Noguchi (JIRA)
Muneyuki Noguchi created KAFKA-1727:
---

 Summary: Fix comment about message format
 Key: KAFKA-1727
 URL: https://issues.apache.org/jira/browse/KAFKA-1727
 Project: Kafka
  Issue Type: Bug
Reporter: Muneyuki Noguchi
Priority: Trivial


The comment in Message.scala says the value of magic identifier is 2,

 bq. 2. 1 byte magic identifier to allow format changes, value is 2 currently

but the actual current magic value is 0.

{code}
  /**
   * The current magic value
   */
  val CurrentMagicValue: Byte = 0
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1727) Fix comment about message format

2014-10-23 Thread Muneyuki Noguchi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181308#comment-14181308
 ] 

Muneyuki Noguchi commented on KAFKA-1727:
-

Created reviewboard https://reviews.apache.org/r/27075/diff/
 against branch origin/trunk

 Fix comment about message format
 

 Key: KAFKA-1727
 URL: https://issues.apache.org/jira/browse/KAFKA-1727
 Project: Kafka
  Issue Type: Bug
Reporter: Muneyuki Noguchi
Priority: Trivial
 Attachments: KAFKA-1727.patch


 The comment in Message.scala says the value of magic identifier is 2,
  bq. 2. 1 byte magic identifier to allow format changes, value is 2 
 currently
 but the actual current magic value is 0.
 {code}
   /**
* The current magic value
*/
   val CurrentMagicValue: Byte = 0
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 27075: Patch for KAFKA-1727

2014-10-23 Thread Muneyuki Noguchi

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27075/
---

Review request for kafka.


Bugs: KAFKA-1727
https://issues.apache.org/jira/browse/KAFKA-1727


Repository: kafka


Description
---

The value of magic identifier is 0 because CurrentMagicValue is 0.


Diffs
-

  core/src/main/scala/kafka/message/Message.scala 
d2a7293c7be4022af30884330924791340acc5c1 

Diff: https://reviews.apache.org/r/27075/diff/


Testing
---


Thanks,

Muneyuki Noguchi



[jira] [Updated] (KAFKA-1727) Fix comment about message format

2014-10-23 Thread Muneyuki Noguchi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muneyuki Noguchi updated KAFKA-1727:

Attachment: KAFKA-1727.patch

 Fix comment about message format
 

 Key: KAFKA-1727
 URL: https://issues.apache.org/jira/browse/KAFKA-1727
 Project: Kafka
  Issue Type: Bug
Reporter: Muneyuki Noguchi
Priority: Trivial
 Attachments: KAFKA-1727.patch


 The comment in Message.scala says the value of magic identifier is 2,
  bq. 2. 1 byte magic identifier to allow format changes, value is 2 
 currently
 but the actual current magic value is 0.
 {code}
   /**
* The current magic value
*/
   val CurrentMagicValue: Byte = 0
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1683) Implement a session concept in the socket server

2014-10-23 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181323#comment-14181323
 ] 

Joe Stein commented on KAFKA-1683:
--

[~gwenshap] We need to be careful with auth once only constant session. 
Revoking credentials would not be picked up until a new session was created.  
So we need some balance between always re-auth and session for scale.  All of 
this down to deploy requirements many/most should be ok I think with the 
re-auth only once every X timeframe or always (levers/knobs). Or some other 
part of the code/system that is constantly checking auth changes and notifying 
the server of that change expiring the session then (making it async in nature).

I really like it to be in the KafkaApi that is where all the good stuff is and 
this should mixed in cleanly with that IMHO.

 Implement a session concept in the socket server
 --

 Key: KAFKA-1683
 URL: https://issues.apache.org/jira/browse/KAFKA-1683
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Gwen Shapira

 To implement authentication we need a way to keep track of some things 
 between requests. The initial use for this would be remembering the 
 authenticated user/principle info, but likely more uses would come up (for 
 example we will also need to remember whether and which encryption or 
 integrity measures are in place on the socket so we can wrap and unwrap 
 writes and reads).
 I was thinking we could just add a Session object that might have a user 
 field. The session object would need to get added to RequestChannel.Request 
 so it is passed down to the API layer with each request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-23 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181348#comment-14181348
 ] 

Evan Huus commented on KAFKA-1718:
--

[~junrao] I already have wiki permissions, so I made the relevant change. While 
I'm in the neighbourhood, what is the expected value of the {{MagicByte}} 
field? The spec doesn't clarify, and my library has been leaving it at 0 
without problems thus far, but [~sriharsha] mentioned earlier that the value 
should be 2?

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Documentation with patches

2014-10-23 Thread Joe Stein
I like how we have things in SVN.  My issue is having two patches from
contributors (one for tests + code and another for docs) that I am trying
to solve.

If we copy the entire SVN docs directory into git under /docs then
contributions can patch the docs in their git patch. Committers can do 1
commit.

When we  release we just cp -r docs/* /svn/  svn add *  svn co
release //or such.  The only trick is that we have to make sure for live
website fixes that we commit in two places (but only then instead of every
time).  I don't mind doing something more fancy and generate the docs from
some markdown but I am not sure it is necessary... we have a lot to get
done in the next few months with 0.9 and I don't want to add anything
unnecessary to that effort.

I do think though with all the changes coming we want code contributors to
keep the docs up to date and have doc changes + code + test all in one git
patch would be best for everyone (however we accomplish that) for reviewing
and such.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Currently we are handling the versioning problem by explicitly versioning
 docs that change over time (configuration, quickstart, design, etc). This
 is done by just creating a copy of these pages for each release in a
 subdirectory. So we can commit documentation changes at any time for the
 future release we just don't link up that release until it is out
 (theoretically you could get there by guessing the url, but that is okay).
 Although having multiple copies of certain pages, one for each release,
 seems odd, I think it is actually better because in practice we often end
 up editing old releases when we find problems in the older docs.

 -Jay

 On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho jar...@apache.org
 wrote:

  I would strongly support this idea. We have similar model in all other
  projects where I’m involved:
 
  The docs are part of the usual code base and we do require contributors
 to
  update them when they are adding a new feature. And then during release
  time we simply take snapshot of the docs and upload them to our public
  webpages. This enables us to have simple versioned docs on the website,
 so
  that users can easily find docs for their version and also the public
 site
  do not contain docs of unreleased features :) There is a lot of ways how
 to
  achieve that - in Sqoop 1 we used asciidoc to build the site, in Sqoop
  2/Flume we’re using sphinx, Oozie is using markdown wiki...
 
  Jarcec
 
   On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com wrote:
  
   Hey Joe,
  
   I'd love to encourage documentation contributions.
  
   I think we do have a way to contribute to docs. The current workflow
 for
   contributing is
   1. Checkout the docs
   2. Change docs
   3. Submit patch in normal way
   4. Committer reviews and applies
  
   For committers we have traditionally made the review step optional for
  docs.
  
   In reality this skips step 1.5 which is fiddling with apache for an
 hour
  to
   figure out how to get it to make apache includes work so you can see
 the
   docs. I actually think this is the bigger barrier to doc changes.
  
   One thing we could do is move the docs to one of the static site
  generators
   to do the includes (e.g. Jekyll) this might make setup slightly easier
   (although then you need to install Jekyll...).
  
   -Jay
  
   On Wed, Oct 22, 2014 at 9:55 AM, Joe Stein joe.st...@stealth.ly
 wrote:
  
   This comes up a lot but in reality not enough.  We don't have a great
  way
   for folks to modify the code and change (or add) to the
 documentation. I
   think the documentation is awesome and as we grow the code
 contributors
   that should continue with them too.
  
   One thought I had that would work is that we copy the SVN files into a
   /docs folder in git.  We can then take patches in git and then apply
  them
   to SVN when appropriate (like during a release or for immediate
 fixes).
   This way code changes in that patch can have documentation changes.
 The
   committers can manage what is changed where as appropriate either
 prior
  to
   a release or live updates to the website. Yes/No?
  
   Thanks!
  
   /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
   /
  
 
 



[jira] [Commented] (KAFKA-1726) Wrong message format description

2014-10-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181459#comment-14181459
 ] 

Guozhang Wang commented on KAFKA-1726:
--

Thanks Oleg for pointing this out, I just fixed it.

 Wrong message format description
 

 Key: KAFKA-1726
 URL: https://issues.apache.org/jira/browse/KAFKA-1726
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Oleg Golovin

 Here [in this 
 page|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata#KafkaEnrichedMessageMetadata-CurrentMessageFormat]
  you describe current Kafka message format:
 {code}
 MessageAndOffset = MessageSize Offset Message
   MessageSize = int32
   Offset = int64
  
   Message = Crc MagicByte Attributes KeyLength Key ValueLength Value
 Crc = int32
 MagicByte = int8
 Attributes = int8
 KeyLength = int32
 Key = bytes
 ValueLength = int32
 Value = bytes
 {code}
 In reality _offset_ goes before _messageSize_.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-1726) Wrong message format description

2014-10-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang closed KAFKA-1726.

Assignee: Guozhang Wang

 Wrong message format description
 

 Key: KAFKA-1726
 URL: https://issues.apache.org/jira/browse/KAFKA-1726
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Oleg Golovin
Assignee: Guozhang Wang

 Here [in this 
 page|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata#KafkaEnrichedMessageMetadata-CurrentMessageFormat]
  you describe current Kafka message format:
 {code}
 MessageAndOffset = MessageSize Offset Message
   MessageSize = int32
   Offset = int64
  
   Message = Crc MagicByte Attributes KeyLength Key ValueLength Value
 Crc = int32
 MagicByte = int8
 Attributes = int8
 KeyLength = int32
 Key = bytes
 ValueLength = int32
 Value = bytes
 {code}
 In reality _offset_ goes before _messageSize_.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1726) Wrong message format description

2014-10-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-1726.
--
Resolution: Fixed

 Wrong message format description
 

 Key: KAFKA-1726
 URL: https://issues.apache.org/jira/browse/KAFKA-1726
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Oleg Golovin

 Here [in this 
 page|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata#KafkaEnrichedMessageMetadata-CurrentMessageFormat]
  you describe current Kafka message format:
 {code}
 MessageAndOffset = MessageSize Offset Message
   MessageSize = int32
   Offset = int64
  
   Message = Crc MagicByte Attributes KeyLength Key ValueLength Value
 Crc = int32
 MagicByte = int8
 Attributes = int8
 KeyLength = int32
 Key = bytes
 ValueLength = int32
 Value = bytes
 {code}
 In reality _offset_ goes before _messageSize_.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181470#comment-14181470
 ] 

Jun Rao commented on KAFKA-1718:


Evan,

Thanks for updating the wiki. The MagicByte currently is expected to be only 0. 
We don't validate it on the broker at the moment though. However, we will be if 
we evolve the message format in the future.

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] 0.8.2-beta Release Candidate 1

2014-10-23 Thread Neha Narkhede
Joe,

I downloaded the source distro and ran into this error. Followed the steps
listed on the release process wiki to validate the release.

vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle

FAILURE: Build failed with an exception.

* Where:
Script '/home/vagrant/kafka-0.8.2-beta-src/scala.gradle' line: 2

* What went wrong:
A problem occurred evaluating script.
Cause: Could not find property 'ext' on settings 'kafka-0.8.2-beta-src'.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or
--debug option to get more log output.

BUILD FAILED

Total time: 1.565 secs
vagrant@precise64:~/kafka-0.8.2-beta-src$ cat scala.gradle
if (!hasProperty('scalaVersion')) {
  ext.scalaVersion = '2.10.1'
}
ext.defaultScalaVersion = '2.10.1'
if (scalaVersion.startsWith('2.10')) {
ext.baseScalaVersion = '2.10'
} else if (scalaVersion.startsWith('2.11')) {
ext.baseScalaVersion = '2.11'
} else {
ext.baseScalaVersion = scalaVersion
}

On Tue, Oct 21, 2014 at 1:58 PM, Joe Stein joe.st...@stealth.ly wrote:

 This is the first candidate for release of Apache Kafka 0.8.2-beta

 Release Notes for the 0.8.2-beta release

 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html

 *** Please download, test and vote by Friday, October 24th, 2pm PT

 Kafka's KEYS file containing PGP keys we use to sign the release:
 https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1
 and sha2 (SHA256) checksum.

 * Release artifacts to be voted upon (source and binary):
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/

 * Maven artifacts to be voted upon prior to release:
 https://repository.apache.org/content/groups/staging/

 * scala-doc
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/

 * java-doc
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/

 * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag

 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /



Re: [VOTE] 0.8.2-beta Release Candidate 1

2014-10-23 Thread Joe Stein
hmmm.

I just launched a bootstrap clean room (per release docs). Installed latest
gradle

sudo add-apt-repository ppa:cwchien/gradle
sudo apt-get update
sudo apt-get install gradle

wget
https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/kafka-0.8.2-beta-src.tgz
tar -xvf kafka-0.8.2-beta-src.tgz
cd kafka-0.8.2-beta-src/
gradle

...

Building project 'core' with Scala version 2.10.1
:downloadWrapper

BUILD SUCCESSFUL

Total time: 22.066 secs

so it works, I use the source upload to build the binaries. odd.

*

Maybe the gradle version you have is too old or something? I am using

vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle -version


Gradle 2.1


Build time:   2014-09-08 10:40:39 UTC
Build number: none
Revision: e6cf70745ac11fa943e19294d19a2c527a669a53

Groovy:   2.3.6
Ant:  Apache Ant(TM) version 1.9.3 compiled on December 23 2013
JVM:  1.6.0_45 (Sun Microsystems Inc. 20.45-b01)
OS:   Linux 3.2.0-23-generic amd64



/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Thu, Oct 23, 2014 at 12:49 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 Joe,

 I downloaded the source distro and ran into this error. Followed the steps
 listed on the release process wiki to validate the release.

 vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle

 FAILURE: Build failed with an exception.

 * Where:
 Script '/home/vagrant/kafka-0.8.2-beta-src/scala.gradle' line: 2

 * What went wrong:
 A problem occurred evaluating script.
 Cause: Could not find property 'ext' on settings 'kafka-0.8.2-beta-src'.

 * Try:
 Run with --stacktrace option to get the stack trace. Run with --info or
 --debug option to get more log output.

 BUILD FAILED

 Total time: 1.565 secs
 vagrant@precise64:~/kafka-0.8.2-beta-src$ cat scala.gradle
 if (!hasProperty('scalaVersion')) {
   ext.scalaVersion = '2.10.1'
 }
 ext.defaultScalaVersion = '2.10.1'
 if (scalaVersion.startsWith('2.10')) {
 ext.baseScalaVersion = '2.10'
 } else if (scalaVersion.startsWith('2.11')) {
 ext.baseScalaVersion = '2.11'
 } else {
 ext.baseScalaVersion = scalaVersion
 }

 On Tue, Oct 21, 2014 at 1:58 PM, Joe Stein joe.st...@stealth.ly wrote:

  This is the first candidate for release of Apache Kafka 0.8.2-beta
 
  Release Notes for the 0.8.2-beta release
 
 
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html
 
  *** Please download, test and vote by Friday, October 24th, 2pm PT
 
  Kafka's KEYS file containing PGP keys we use to sign the release:
  https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1
  and sha2 (SHA256) checksum.
 
  * Release artifacts to be voted upon (source and binary):
  https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/
 
  * Maven artifacts to be voted upon prior to release:
  https://repository.apache.org/content/groups/staging/
 
  * scala-doc
 
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/
 
  * java-doc
 
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/
 
  * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag
 
 
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 



[jira] [Commented] (KAFKA-1683) Implement a session concept in the socket server

2014-10-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181569#comment-14181569
 ] 

Jay Kreps commented on KAFKA-1683:
--

I had imagined the auth happening in the network layer, though I agree that may 
be somewhat mixing concerns and your approach sounds like it could potentially 
better. There are a few details to work out, though. In the Kerberos case this 
is straight-forward since the request is processed in the API layer anyway. But 
how will this work for TLS? I was imagining that TLS would use a separate 
acceptor in the network layer that after accepting a connection and completing 
the handshake would populate the appropriate session fields. However if the 
sessions are maintained in the API layer then somehow we will have to work out 
how the result of the authentication gets to the api layer. Another issue is 
how we remove sessions when the connection closes. We would need to garbage 
collect these as connections were closed, and, perhaps more importantly, we 
need to ensure that it isn't possible to trick your way into that hash table. 
I.e. if the notion of equality for SelectorKey is comparing the underlying file 
descriptor, and file descriptors are reused periodically there could be 
problems.

The implementation I was something like this. Currently we attach the current 
request to each socket in the socket server. Instead of this we would attach 
some object that could hold the session, the current request, and any other 
future things we need. For TLS the user information would be populated after 
the handshake but before the acceptor hands the connection over to the 
processor thread. For Kerberos the session would be populated in 
KafkaApis.handleAuthenticate (or whatever we call it). The only advantage to 
this approach that I see is that the session lifetime is easily tied to the 
connection (which is what we want) and it will be easy to validate this.

I was thinking of the session concept as being a somewhat generic feature of 
the network layer to maintain per-connection state so in that sense it isn't 
too terrible to have it be part of the networkapi layer contract, though I 
agree in general that the simpler we can keep the network stuff the better (it 
would be really easy for more and more business logic to creep in once we are 
doing auth stuff).

 Implement a session concept in the socket server
 --

 Key: KAFKA-1683
 URL: https://issues.apache.org/jira/browse/KAFKA-1683
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Gwen Shapira

 To implement authentication we need a way to keep track of some things 
 between requests. The initial use for this would be remembering the 
 authenticated user/principle info, but likely more uses would come up (for 
 example we will also need to remember whether and which encryption or 
 integrity measures are in place on the socket so we can wrap and unwrap 
 writes and reads).
 I was thinking we could just add a Session object that might have a user 
 field. The session object would need to get added to RequestChannel.Request 
 so it is passed down to the API layer with each request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1641) Log cleaner exits if last cleaned offset is lower than earliest offset

2014-10-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181600#comment-14181600
 ] 

Guozhang Wang commented on KAFKA-1641:
--

[~jjkoshy] Could you take another look?

 Log cleaner exits if last cleaned offset is lower than earliest offset
 --

 Key: KAFKA-1641
 URL: https://issues.apache.org/jira/browse/KAFKA-1641
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Joel Koshy
Assignee: Guozhang Wang
 Attachments: KAFKA-1641.patch, KAFKA-1641_2014-10-09_13:04:15.patch


 Encountered this recently: the log cleaner exited a while ago (I think 
 because the topic had compressed messages). That issue was subsequently 
 addressed by having the producer only send uncompressed. However, on a 
 subsequent restart of the broker we see this:
 In this scenario I think it is reasonable to just emit a warning and have the 
 cleaner round up its first dirty offset to the base offset of the first 
 segment.
 {code}
 [kafka-server] [] [kafka-log-cleaner-thread-0], Error due to 
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 54770438 but segment base offset is 382844024 for log testtopic-0.
 at scala.Predef$.require(Predef.scala:145)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:491)
 at kafka.log.Cleaner.clean(LogCleaner.scala:288)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:202)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:187)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Documentation with patches

2014-10-23 Thread Jay Kreps
That makes sense. I agree that moving to another doc system isn't a high
priority (it isn't as much work as it sounds because the HTML can all
remain as is, just the includes would get converted). But actually I don't
think that having a patch for docs and a patch for the code is too big a
hurdle either. I think maybe we should just start asking for documentation
patches and describe that in the contributing section--likely most people
just don't think of it.

-Jay

On Thu, Oct 23, 2014 at 7:16 AM, Joe Stein joe.st...@stealth.ly wrote:

 I like how we have things in SVN.  My issue is having two patches from
 contributors (one for tests + code and another for docs) that I am trying
 to solve.

 If we copy the entire SVN docs directory into git under /docs then
 contributions can patch the docs in their git patch. Committers can do 1
 commit.

 When we  release we just cp -r docs/* /svn/  svn add *  svn co
 release //or such.  The only trick is that we have to make sure for live
 website fixes that we commit in two places (but only then instead of every
 time).  I don't mind doing something more fancy and generate the docs from
 some markdown but I am not sure it is necessary... we have a lot to get
 done in the next few months with 0.9 and I don't want to add anything
 unnecessary to that effort.

 I do think though with all the changes coming we want code contributors to
 keep the docs up to date and have doc changes + code + test all in one git
 patch would be best for everyone (however we accomplish that) for reviewing
 and such.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Currently we are handling the versioning problem by explicitly versioning
  docs that change over time (configuration, quickstart, design, etc). This
  is done by just creating a copy of these pages for each release in a
  subdirectory. So we can commit documentation changes at any time for the
  future release we just don't link up that release until it is out
  (theoretically you could get there by guessing the url, but that is
 okay).
  Although having multiple copies of certain pages, one for each release,
  seems odd, I think it is actually better because in practice we often end
  up editing old releases when we find problems in the older docs.
 
  -Jay
 
  On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho jar...@apache.org
  wrote:
 
   I would strongly support this idea. We have similar model in all other
   projects where I’m involved:
  
   The docs are part of the usual code base and we do require contributors
  to
   update them when they are adding a new feature. And then during release
   time we simply take snapshot of the docs and upload them to our public
   webpages. This enables us to have simple versioned docs on the website,
  so
   that users can easily find docs for their version and also the public
  site
   do not contain docs of unreleased features :) There is a lot of ways
 how
  to
   achieve that - in Sqoop 1 we used asciidoc to build the site, in Sqoop
   2/Flume we’re using sphinx, Oozie is using markdown wiki...
  
   Jarcec
  
On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com wrote:
   
Hey Joe,
   
I'd love to encourage documentation contributions.
   
I think we do have a way to contribute to docs. The current workflow
  for
contributing is
1. Checkout the docs
2. Change docs
3. Submit patch in normal way
4. Committer reviews and applies
   
For committers we have traditionally made the review step optional
 for
   docs.
   
In reality this skips step 1.5 which is fiddling with apache for an
  hour
   to
figure out how to get it to make apache includes work so you can see
  the
docs. I actually think this is the bigger barrier to doc changes.
   
One thing we could do is move the docs to one of the static site
   generators
to do the includes (e.g. Jekyll) this might make setup slightly
 easier
(although then you need to install Jekyll...).
   
-Jay
   
On Wed, Oct 22, 2014 at 9:55 AM, Joe Stein joe.st...@stealth.ly
  wrote:
   
This comes up a lot but in reality not enough.  We don't have a
 great
   way
for folks to modify the code and change (or add) to the
  documentation. I
think the documentation is awesome and as we grow the code
  contributors
that should continue with them too.
   
One thought I had that would work is that we copy the SVN files
 into a
/docs folder in git.  We can then take patches in git and then apply
   them
to SVN when appropriate (like during a release or for immediate
  fixes).
This way code changes in that patch can have documentation changes.
  The

Re: Documentation with patches

2014-10-23 Thread Gwen Shapira
With two patches, we'll need a two-phase commit to maintain
consistency, and we all know what a PITA this can be.
(Sorry, couldn't resist the pun. Carry on.)

On Thu, Oct 23, 2014 at 10:32 AM, Jay Kreps jay.kr...@gmail.com wrote:
 That makes sense. I agree that moving to another doc system isn't a high
 priority (it isn't as much work as it sounds because the HTML can all
 remain as is, just the includes would get converted). But actually I don't
 think that having a patch for docs and a patch for the code is too big a
 hurdle either. I think maybe we should just start asking for documentation
 patches and describe that in the contributing section--likely most people
 just don't think of it.

 -Jay

 On Thu, Oct 23, 2014 at 7:16 AM, Joe Stein joe.st...@stealth.ly wrote:

 I like how we have things in SVN.  My issue is having two patches from
 contributors (one for tests + code and another for docs) that I am trying
 to solve.

 If we copy the entire SVN docs directory into git under /docs then
 contributions can patch the docs in their git patch. Committers can do 1
 commit.

 When we  release we just cp -r docs/* /svn/  svn add *  svn co
 release //or such.  The only trick is that we have to make sure for live
 website fixes that we commit in two places (but only then instead of every
 time).  I don't mind doing something more fancy and generate the docs from
 some markdown but I am not sure it is necessary... we have a lot to get
 done in the next few months with 0.9 and I don't want to add anything
 unnecessary to that effort.

 I do think though with all the changes coming we want code contributors to
 keep the docs up to date and have doc changes + code + test all in one git
 patch would be best for everyone (however we accomplish that) for reviewing
 and such.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Currently we are handling the versioning problem by explicitly versioning
  docs that change over time (configuration, quickstart, design, etc). This
  is done by just creating a copy of these pages for each release in a
  subdirectory. So we can commit documentation changes at any time for the
  future release we just don't link up that release until it is out
  (theoretically you could get there by guessing the url, but that is
 okay).
  Although having multiple copies of certain pages, one for each release,
  seems odd, I think it is actually better because in practice we often end
  up editing old releases when we find problems in the older docs.
 
  -Jay
 
  On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho jar...@apache.org
  wrote:
 
   I would strongly support this idea. We have similar model in all other
   projects where I’m involved:
  
   The docs are part of the usual code base and we do require contributors
  to
   update them when they are adding a new feature. And then during release
   time we simply take snapshot of the docs and upload them to our public
   webpages. This enables us to have simple versioned docs on the website,
  so
   that users can easily find docs for their version and also the public
  site
   do not contain docs of unreleased features :) There is a lot of ways
 how
  to
   achieve that - in Sqoop 1 we used asciidoc to build the site, in Sqoop
   2/Flume we’re using sphinx, Oozie is using markdown wiki...
  
   Jarcec
  
On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com wrote:
   
Hey Joe,
   
I'd love to encourage documentation contributions.
   
I think we do have a way to contribute to docs. The current workflow
  for
contributing is
1. Checkout the docs
2. Change docs
3. Submit patch in normal way
4. Committer reviews and applies
   
For committers we have traditionally made the review step optional
 for
   docs.
   
In reality this skips step 1.5 which is fiddling with apache for an
  hour
   to
figure out how to get it to make apache includes work so you can see
  the
docs. I actually think this is the bigger barrier to doc changes.
   
One thing we could do is move the docs to one of the static site
   generators
to do the includes (e.g. Jekyll) this might make setup slightly
 easier
(although then you need to install Jekyll...).
   
-Jay
   
On Wed, Oct 22, 2014 at 9:55 AM, Joe Stein joe.st...@stealth.ly
  wrote:
   
This comes up a lot but in reality not enough.  We don't have a
 great
   way
for folks to modify the code and change (or add) to the
  documentation. I
think the documentation is awesome and as we grow the code
  contributors
that should continue with them too.
   
One thought I had that would work is that we copy the SVN files
 into a
/docs 

[jira] [Commented] (KAFKA-1683) Implement a session concept in the socket server

2014-10-23 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181696#comment-14181696
 ] 

Gwen Shapira commented on KAFKA-1683:
-

Very good points [~jkreps] and [~joestein].

The way I read this - we should not rely on Session to maintain security. This 
is too high of a risk and very much re-inventing the wheel. We should use 
Session to maintain a generic user information, common to SASL, TLS and 
delegation tokens, that the API handlers know how to use. This will basically 
serve as the basis for the authorization layer.

The way GSS + JAAS work (and I'm pretty sure we want to use these for Kerberos) 
- the client and server do a little authentication dance when the secured 
connection is first established, and the client is left with input and output 
streams that are secured. Everything sent and received on that socket 
afterwards will be encrypted with the server key and include the client session 
token. Decrypting the message and validating the client session token doesn't 
require communicating with Kerberos, so while there is certain overhead, it 
doesn't involve the network.

If we use this token to populate our Session object with every request, we can 
be certain that the information is relevant to current connection and is 
unexpired (It will be the client responsibility to renew the token with KRB 
occasionally, this is per protocol).

This will be easier to do if we use a separate port for Kerberos connections, 
they way we do for TLS, otherwise we'll need another way to figure out whether 
to perform the little song-and-dance when accepting the connection. But we can 
figure out this part later. 

Meanwhile, Session object, populated by SocketServer when creating the request, 
sent down to API layer. Since I like having the requestKey in the API, I'll add 
the new Session object as an addition, not a replacement. 

... and that was a very roundabout way to go back to the original request, but 
I had to make sure this still makes sense to me :)

 Implement a session concept in the socket server
 --

 Key: KAFKA-1683
 URL: https://issues.apache.org/jira/browse/KAFKA-1683
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Gwen Shapira

 To implement authentication we need a way to keep track of some things 
 between requests. The initial use for this would be remembering the 
 authenticated user/principle info, but likely more uses would come up (for 
 example we will also need to remember whether and which encryption or 
 integrity measures are in place on the socket so we can wrap and unwrap 
 writes and reads).
 I was thinking we could just add a Session object that might have a user 
 field. The session object would need to get added to RequestChannel.Request 
 so it is passed down to the API layer with each request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Jiangjie Qin


 On Oct. 22, 2014, 9:32 p.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 271
  https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line271
 
  Is there any value in setting this to true? It seems that just checking 
  if it is false and exiting the process suffices. Setting to true something 
  that is called cleanShutdown, when in fact, it isn't a clean shutdown is 
  confusing to read.
  
  Also good to add a FATAL log entry as suggested by Guozhang as well.
 
 Guozhang Wang wrote:
 The boolean is used when the internal threads tries to exist, when it is 
 not set, the threads knows it is closing abnormally and hence goes on to 
 handle it. I agree its name is a bit misleading, and probably we can just 
 name it as isShuttingDown.

I'm thinking maybe we can use a separate flag in each thread to indicate 
whether it exits normally or not. So in the catch clause we set that flag to 
indicate the thread is exiting abnormally. That might be clearer.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review57907
---


On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 22, 2014, 10:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181725#comment-14181725
 ] 

Guozhang Wang commented on KAFKA-1501:
--

I can re-produce the issue from time to time on my PC, and after digging around 
for a while I think this is the cause:

http://www.serverframework.com/asynchronousevents/2011/01/time-wait-and-its-design-implications-for-protocols-and-scalable-servers.html

I am attaching a simple patch here, [~copester] could you help double-checking 
if it fixes your issue?

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
  Labels: newbie

 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-23 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181730#comment-14181730
 ] 

Bhavesh Mistry commented on KAFKA-1721:
---

[~ewencp],

Thanks for fixing this issue.  Snappy Dev has release new version with fix 
https://oss.sonatype.org/content/repositories/releases/org/xerial/snappy/snappy-java/1.1.1.4/
 

Thanks,
Bhavesh

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava

 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1501:
-
Attachment: KAFKA-1501.patch

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1501:
-
Assignee: Guozhang Wang
  Status: Patch Available  (was: Open)

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181734#comment-14181734
 ] 

Guozhang Wang commented on KAFKA-1501:
--

Created reviewboard https://reviews.apache.org/r/27101/diff/
 against branch origin/trunk

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Joel Koshy


 On Oct. 21, 2014, 10:21 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 323
  https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line323
 
  Is this change intended?
 
 Jiangjie Qin wrote:
 Yes, it is intended, so that we can make sure each data channel queue 
 will receive a shutdown message. Otherwise 2 messages could go to the same 
 data channel queue.

Is this true? Each put will go to the next queue. i.e., 
Utils.abs(counter.getAndIncrement()) % numConsumers
So suppose there are 10 producers and you put 10 shutdown messages. Each of 
those output queues will get exactly one shutdown message.
I have a separate comment on the existing code wrt naming which I will put in 
the RB directly.


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review57680
---


On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 22, 2014, 10:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review58049
---



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment98952

Pre-existing naming issue that would be good to fix in this.

At first glance, this is extremely confusing and looks wrong.

DataChannel is defined as DataChannel(capacity, numProducers, numConsumers)

It is instantiated on line 127 as
new DataChannel(capacity, numConsumers, numProducers)

i.e., it seems as though the arguments are switched.

The consumers/producers parameters of the DataChannel are to be interpreted 
as inputs/outputs and have nothing to do with numConsumers/numProducers in the 
mirror maker.

So, can you rename these fields in the DataChannel?

i.e., numConsumers - numOutputs and numProducers - numInputs?


- Joel Koshy


On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 22, 2014, 10:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Review Request 27101: Fix KAFKA-1501 by enabling reuse address

2014-10-23 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27101/
---

Review request for kafka.


Bugs: KAFKA-1501
https://issues.apache.org/jira/browse/KAFKA-1501


Repository: kafka


Description
---

needs repeated unit tests to verify


Diffs
-

  core/src/main/scala/kafka/network/SocketServer.scala 
cee76b323e5f3e4c783749ac9e78e1ef02897e3b 

Diff: https://reviews.apache.org/r/27101/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Documentation with patches

2014-10-23 Thread Sriram Subramanian
I actually think it is a pretty good idea to have the docs in git.
Summarizing the benefits -

1. If a contributor/committer makes any significant changes to how a
functionality works, they could always update the docs in parallel and
reviewers can enforce this if they find the change deems a documentation
change. The will help us to create a process around updating documentation
which is hard today.
2. Creating a new version can be done when we cut a new branch. This seems
a lot easier than remembering to update the documentation for a new
version as an after thought.
3. Easy to review the docs with the code change in mind. We forget the
exact changes over time and reviewers would need to review the code again
to do a good review of the docs.

+1 from me.

On 10/23/14 10:37 AM, Gwen Shapira gshap...@cloudera.com wrote:

With two patches, we'll need a two-phase commit to maintain
consistency, and we all know what a PITA this can be.
(Sorry, couldn't resist the pun. Carry on.)

On Thu, Oct 23, 2014 at 10:32 AM, Jay Kreps jay.kr...@gmail.com wrote:
 That makes sense. I agree that moving to another doc system isn't a high
 priority (it isn't as much work as it sounds because the HTML can all
 remain as is, just the includes would get converted). But actually I
don't
 think that having a patch for docs and a patch for the code is too big a
 hurdle either. I think maybe we should just start asking for
documentation
 patches and describe that in the contributing section--likely most
people
 just don't think of it.

 -Jay

 On Thu, Oct 23, 2014 at 7:16 AM, Joe Stein joe.st...@stealth.ly wrote:

 I like how we have things in SVN.  My issue is having two patches from
 contributors (one for tests + code and another for docs) that I am
trying
 to solve.

 If we copy the entire SVN docs directory into git under /docs then
 contributions can patch the docs in their git patch. Committers can do
1
 commit.

 When we  release we just cp -r docs/* /svn/  svn add *  svn co
 release //or such.  The only trick is that we have to make sure for
live
 website fixes that we commit in two places (but only then instead of
every
 time).  I don't mind doing something more fancy and generate the docs
from
 some markdown but I am not sure it is necessary... we have a lot to get
 done in the next few months with 0.9 and I don't want to add anything
 unnecessary to that effort.

 I do think though with all the changes coming we want code
contributors to
 keep the docs up to date and have doc changes + code + test all in one
git
 patch would be best for everyone (however we accomplish that) for
reviewing
 and such.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Currently we are handling the versioning problem by explicitly
versioning
  docs that change over time (configuration, quickstart, design, etc).
This
  is done by just creating a copy of these pages for each release in a
  subdirectory. So we can commit documentation changes at any time for
the
  future release we just don't link up that release until it is out
  (theoretically you could get there by guessing the url, but that is
 okay).
  Although having multiple copies of certain pages, one for each
release,
  seems odd, I think it is actually better because in practice we
often end
  up editing old releases when we find problems in the older docs.
 
  -Jay
 
  On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho
jar...@apache.org
  wrote:
 
   I would strongly support this idea. We have similar model in all
other
   projects where I¹m involved:
  
   The docs are part of the usual code base and we do require
contributors
  to
   update them when they are adding a new feature. And then during
release
   time we simply take snapshot of the docs and upload them to our
public
   webpages. This enables us to have simple versioned docs on the
website,
  so
   that users can easily find docs for their version and also the
public
  site
   do not contain docs of unreleased features :) There is a lot of
ways
 how
  to
   achieve that - in Sqoop 1 we used asciidoc to build the site, in
Sqoop
   2/Flume we¹re using sphinx, Oozie is using markdown wiki...
  
   Jarcec
  
On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com
wrote:
   
Hey Joe,
   
I'd love to encourage documentation contributions.
   
I think we do have a way to contribute to docs. The current
workflow
  for
contributing is
1. Checkout the docs
2. Change docs
3. Submit patch in normal way
4. Committer reviews and applies
   
For committers we have traditionally made the review step
optional
 for
   docs.
   
In reality this skips step 1.5 which is fiddling with apache for
an
  hour
   

Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Guozhang Wang


 On Oct. 22, 2014, 9:32 p.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 271
  https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line271
 
  Is there any value in setting this to true? It seems that just checking 
  if it is false and exiting the process suffices. Setting to true something 
  that is called cleanShutdown, when in fact, it isn't a clean shutdown is 
  confusing to read.
  
  Also good to add a FATAL log entry as suggested by Guozhang as well.
 
 Guozhang Wang wrote:
 The boolean is used when the internal threads tries to exist, when it is 
 not set, the threads knows it is closing abnormally and hence goes on to 
 handle it. I agree its name is a bit misleading, and probably we can just 
 name it as isShuttingDown.
 
 Jiangjie Qin wrote:
 I'm thinking maybe we can use a separate flag in each thread to indicate 
 whether it exits normally or not. So in the catch clause we set that flag to 
 indicate the thread is exiting abnormally. That might be clearer.

I personally think the  flag-per-thread is an overkill here: when each thread 
is about to exist (i.e. in the finally block), all it needs to check is if the 
whole MM is currently shutdown or not, if it is, then the thread itself knows 
it exist normally, otherwise log the FATAL and force killing the MM.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review57907
---


On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 22, 2014, 10:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Jiangjie Qin


 On Oct. 21, 2014, 10:21 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 323
  https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line323
 
  Is this change intended?
 
 Jiangjie Qin wrote:
 Yes, it is intended, so that we can make sure each data channel queue 
 will receive a shutdown message. Otherwise 2 messages could go to the same 
 data channel queue.
 
 Joel Koshy wrote:
 Is this true? Each put will go to the next queue. i.e., 
 Utils.abs(counter.getAndIncrement()) % numConsumers
 So suppose there are 10 producers and you put 10 shutdown messages. Each 
 of those output queues will get exactly one shutdown message.
 I have a separate comment on the existing code wrt naming which I will 
 put in the RB directly.

Hi Joel, yes, I think you are right, since there will be no other put during 
the shutting down pahse, though it is kind of wiered that when we call shutdown 
for 1 thread and the other one got shutdown...


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review57680
---


On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 22, 2014, 10:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Documentation with patches

2014-10-23 Thread Gwen Shapira
+1

On Thu, Oct 23, 2014 at 11:45 AM, Sriram Subramanian
srsubraman...@linkedin.com.invalid wrote:
 I actually think it is a pretty good idea to have the docs in git.
 Summarizing the benefits -

 1. If a contributor/committer makes any significant changes to how a
 functionality works, they could always update the docs in parallel and
 reviewers can enforce this if they find the change deems a documentation
 change. The will help us to create a process around updating documentation
 which is hard today.
 2. Creating a new version can be done when we cut a new branch. This seems
 a lot easier than remembering to update the documentation for a new
 version as an after thought.
 3. Easy to review the docs with the code change in mind. We forget the
 exact changes over time and reviewers would need to review the code again
 to do a good review of the docs.

 +1 from me.

 On 10/23/14 10:37 AM, Gwen Shapira gshap...@cloudera.com wrote:

With two patches, we'll need a two-phase commit to maintain
consistency, and we all know what a PITA this can be.
(Sorry, couldn't resist the pun. Carry on.)

On Thu, Oct 23, 2014 at 10:32 AM, Jay Kreps jay.kr...@gmail.com wrote:
 That makes sense. I agree that moving to another doc system isn't a high
 priority (it isn't as much work as it sounds because the HTML can all
 remain as is, just the includes would get converted). But actually I
don't
 think that having a patch for docs and a patch for the code is too big a
 hurdle either. I think maybe we should just start asking for
documentation
 patches and describe that in the contributing section--likely most
people
 just don't think of it.

 -Jay

 On Thu, Oct 23, 2014 at 7:16 AM, Joe Stein joe.st...@stealth.ly wrote:

 I like how we have things in SVN.  My issue is having two patches from
 contributors (one for tests + code and another for docs) that I am
trying
 to solve.

 If we copy the entire SVN docs directory into git under /docs then
 contributions can patch the docs in their git patch. Committers can do
1
 commit.

 When we  release we just cp -r docs/* /svn/  svn add *  svn co
 release //or such.  The only trick is that we have to make sure for
live
 website fixes that we commit in two places (but only then instead of
every
 time).  I don't mind doing something more fancy and generate the docs
from
 some markdown but I am not sure it is necessary... we have a lot to get
 done in the next few months with 0.9 and I don't want to add anything
 unnecessary to that effort.

 I do think though with all the changes coming we want code
contributors to
 keep the docs up to date and have doc changes + code + test all in one
git
 patch would be best for everyone (however we accomplish that) for
reviewing
 and such.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Wed, Oct 22, 2014 at 1:53 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Currently we are handling the versioning problem by explicitly
versioning
  docs that change over time (configuration, quickstart, design, etc).
This
  is done by just creating a copy of these pages for each release in a
  subdirectory. So we can commit documentation changes at any time for
the
  future release we just don't link up that release until it is out
  (theoretically you could get there by guessing the url, but that is
 okay).
  Although having multiple copies of certain pages, one for each
release,
  seems odd, I think it is actually better because in practice we
often end
  up editing old releases when we find problems in the older docs.
 
  -Jay
 
  On Wed, Oct 22, 2014 at 10:35 AM, Jarek Jarcec Cecho
jar...@apache.org
  wrote:
 
   I would strongly support this idea. We have similar model in all
other
   projects where I¹m involved:
  
   The docs are part of the usual code base and we do require
contributors
  to
   update them when they are adding a new feature. And then during
release
   time we simply take snapshot of the docs and upload them to our
public
   webpages. This enables us to have simple versioned docs on the
website,
  so
   that users can easily find docs for their version and also the
public
  site
   do not contain docs of unreleased features :) There is a lot of
ways
 how
  to
   achieve that - in Sqoop 1 we used asciidoc to build the site, in
Sqoop
   2/Flume we¹re using sphinx, Oozie is using markdown wiki...
  
   Jarcec
  
On Oct 22, 2014, at 10:27 AM, Jay Kreps jay.kr...@gmail.com
wrote:
   
Hey Joe,
   
I'd love to encourage documentation contributions.
   
I think we do have a way to contribute to docs. The current
workflow
  for
contributing is
1. Checkout the docs
2. Change docs
3. Submit patch in normal way
4. Committer reviews and applies
   
For committers we have traditionally made the review 

[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181795#comment-14181795
 ] 

Jay Kreps commented on KAFKA-1501:
--

Nice!

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-23 Thread Chris Cope (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181928#comment-14181928
 ] 

Chris Cope commented on KAFKA-1501:
---

Awesome! I should know shortly,  spinning off 100 jobs via phone...

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication

2014-10-23 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181947#comment-14181947
 ] 

Gwen Shapira commented on KAFKA-1684:
-

Its a large patch, so I didn't do a full review yet. But I wanted to mention 
that I love the ChannelFactory implementation. Very nice, clean and reusable.

IMO, this should be a dependency for KAFKA-1686, so we can reuse a lot of the 
work done here.
I think it will be cleaner and easier to simply reuse this code and have 
SASL/GSSAPI simply work as another channel than trying to cram SASL  Kerberos 
on the same SocketServer as the existing connections (I've seen this done in 
HBase, there is a lot of (isSecure, isWrap, etc) involved). 

I'm using this implementation as reference to how Session object can integrate 
with a secured nio channel (KAFKA-1683 for reference).

It looks like SSLSocketChannel has a local SSLEngine that I can access to get 
the client identity. For consistency, we may want all channels (SASL, TLS, 
Kafka) to expose a getIdentity method that SocketServer can use to populate the 
Session object. 


 Implement TLS/SSL authentication
 

 Key: KAFKA-1684
 URL: https://issues.apache.org/jira/browse/KAFKA-1684
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Ivan Lyutov
 Attachments: KAFKA-1684.patch


 Add an SSL port to the configuration and advertise this as part of the 
 metadata request.
 If the SSL port is configured the socket server will need to add a second 
 Acceptor thread to listen on it. Connections accepted on this port will need 
 to go through the SSL handshake prior to being registered with a Processor 
 for request processing.
 SSL requests and responses may need to be wrapped or unwrapped using the 
 SSLEngine that was initialized by the acceptor. This wrapping and unwrapping 
 is very similar to what will need to be done for SASL-based authentication 
 schemes. We should have a uniform interface that covers both of these and we 
 will need to store the instance in the session with the request. The socket 
 server will have to use this object when reading and writing requests. We 
 will need to take care with the FetchRequests as the current 
 FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we 
 can only use this optimization for unencrypted sockets that don't require 
 userspace translation (wrapping).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2014-10-23 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181997#comment-14181997
 ] 

Gwen Shapira commented on KAFKA-1688:
-

The Wiki specifies:
PermissionManager.isPermitted(Subject subject, InetAddress ip, Permissions 
permission, String resource)

TLS doesn't seem to have a Subject, but both TLS and SASL have a Principal. I 
assume we can go with Principal instead.
(Ran into this while trying to figure out what goes into Session. We can always 
change it later, but thought I'll make note)

 Add authorization interface and naive implementation
 

 Key: KAFKA-1688
 URL: https://issues.apache.org/jira/browse/KAFKA-1688
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani

 Add a PermissionManager interface as described here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security
 (possibly there is a better name?)
 Implement calls to the PermissionsManager in KafkaApis for the main requests 
 (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
 exception to the protocol to indicate permission denied.
 Add a server configuration to give the class you want to instantiate that 
 implements that interface. That class can define its own configuration 
 properties from the main config file.
 Provide a simple implementation of this interface which just takes a user and 
 ip whitelist and permits those in either of the whitelists to do anything, 
 and denies all others.
 Rather than writing an integration test for this class we can probably just 
 use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-23 Thread Jiangjie Qin


 On Oct. 23, 2014, 12:42 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/ReplicaManager.scala, lines 531-538
  https://reviews.apache.org/r/26373/diff/4/?file=728653#file728653line531
 
  I am not sure that I understand how Option.flatten works. Would it be 
  clearly if we first filter out partitions with no live leader and then 
  generate the TopicAndPartition to BrokerAndInitialOffset map?

This change actually should be reverted as Joel suggested because we no longer 
add the partition without leader to partitionsToMakeFollower any more, instead, 
we only create local replica for it. This section should be exactly same as the 
original code.
I put flattern here to just remove the None option from a set...


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review57947
---


On Oct. 22, 2014, 6:08 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 22, 2014, 6:08 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Joel's comments.
 
 
 the version 2 code seems to be submitted by mistake... This should be the 
 code for review that addressed Joel's comments.
 
 
 Addressed Jun's comments. Will do tests to verify if it works.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 26885: Patch for KAFKA-1642

2014-10-23 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review57513
---


Thanks for the patch. Looks good to me. Some minor comments below.


clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
https://reviews.apache.org/r/26885/#comment98242

It's probably better to put them in two lines.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
https://reviews.apache.org/r/26885/#comment98234

It seems that expired can just be timeLeftMs = 0?



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
https://reviews.apache.org/r/26885/#comment98236

Are these comments redundant now given the new comments above?



clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
https://reviews.apache.org/r/26885/#comment98388

It seems that in this case, the nextReadyCheckDelayMs should be the 
remaining linger time for tp1, which is lingerMs/2. Should we just assert that?


- Jun Rao


On Oct. 21, 2014, 12:34 a.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26885/
 ---
 
 (Updated Oct. 21, 2014, 12:34 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1642
 https://issues.apache.org/jira/browse/KAFKA-1642
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fixes two issues with the computation of ready nodes and poll timeouts in
 Sender/RecordAccumulator:
 
 1. The timeout was computed incorrectly because it took into account all 
 nodes,
 even if they had data to send such that their timeout would be 0. However, 
 nodes
 were then filtered based on whether it was possible to send (i.e. their
 connection was still good) which could result in nothing to send and a 0
 timeout, resulting in busy looping. Instead, the timeout needs to be computed
 only using data that cannot be immediately sent, i.e. where the timeout will 
 be
 greater than 0. This timeout is only used if, after filtering by whether
 connections are ready for sending, there is no data to be sent. Other events 
 can
 wake the thread up earlier, e.g. a client reconnects and becomes ready again.
 
 2. One of the conditions indicating whether data is sendable is whether a
 timeout has expired -- either the linger time or the retry backoff. This
 condition wasn't accounting for both cases properly, always using the linger
 time. This means the retry backoff was probably not being respected.
 
 KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
 none can send data because they are in a connection backoff period.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 d304660f29246e9600efe3ddb28cfcc2b074bed3 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 29658d4a15f112dc0af5ce517eaab93e6f00134b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  c5d470011d334318d5ee801021aadd0c000974a6 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 aae8d4a1e98279470587d397cc779a9baf6fee6c 
   
 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
  0762b35abba0551f23047348c5893bb8c9acff14 
 
 Diff: https://reviews.apache.org/r/26885/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Commented] (KAFKA-1683) Implement a session concept in the socket server

2014-10-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182024#comment-14182024
 ] 

Jay Kreps commented on KAFKA-1683:
--

Here is what I was imagining, not sure if this is what you are proposing or 
not. It might be worth while to chat quickly to hash it out.

1. We add Session to the request passed from the socket server to the api 
layer. Session.authenticatedUser is the user that we will use in the 
authorization layer to check permissions. Session.securityCodec is the codec 
used to wrap and unwrap messages sent by the socket layer. If the securityCodec 
is null, no wrapping or unwrapping is done. We will need an SSL and SASL 
implementation of the security codec. The SSL version will wrap the SSLEngine 
instance and the SASL version will wrap the SaslServer instance, both of which 
provide a wrap and unwrap version.

2. Currently in the socketserver the only state we maintain per connection is 
the current request we are reading. This is attached to the channel. We would 
replace this with some variable which could hold both the in-progress request 
and the session.

3. We would add a new TlsAcceptor that would listen on the TLS port and handle 
the handshake for new TLS connections. When the handshake is complete the 
TlsAcceptor would grab the user information and populate that field in the 
session along with the securityCodec.

4. A similar thing would happen in KafkaApis.handleAuthenticate for Kerberos. 
This api would take the next step in the SASL challenge/response cycle and if 
it is complete it would populate the authenticatedUser and securityCodec fields 
in the session.

5. All authorization checks would be done using Session.authenticatedUser.

6. When the socket server is writing data it will always first run it through 
securityCodec.wrap() prior to sending it to make sure it is encrypted. When it 
is reading data it will read a complete size delimited message and then do 
securityCodec.unwrap() prior to handing that buffer off to the api layer. If 
securityCodec is null it will skip this step.

A couple of assumptions here that I'm not sure were explicit:
a. Gwen as you point out the notion of user/subject/principle isn't totally 
clear. Can this just be a string we extract rather than a java 
Principle/Subject?
b. Is the protocol for encrypted or otherwise integrity checked packets a 4 
byte plaintext size followed by an N byte packet? Or is the size also 
encrypted? If the later the unwrapping will have to happen in tandem with the 
reading which is slightly more complex but perhaps more clean.

 Implement a session concept in the socket server
 --

 Key: KAFKA-1683
 URL: https://issues.apache.org/jira/browse/KAFKA-1683
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Gwen Shapira

 To implement authentication we need a way to keep track of some things 
 between requests. The initial use for this would be remembering the 
 authenticated user/principle info, but likely more uses would come up (for 
 example we will also need to remember whether and which encryption or 
 integrity measures are in place on the socket so we can wrap and unwrap 
 writes and reads).
 I was thinking we could just add a Session object that might have a user 
 field. The session object would need to get added to RequestChannel.Request 
 so it is passed down to the API layer with each request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] 0.8.2-beta Release Candidate 1

2014-10-23 Thread Ewen Cheslack-Postava
Yeah, the version on Precise is really old, 1.0, and on Trusty is 1.4.
Using the PPA version worked fine for me on Trusty.

-Ewen

On Thu, Oct 23, 2014, at 10:02 AM, Joe Stein wrote:
 hmmm.
 
 I just launched a bootstrap clean room (per release docs). Installed
 latest
 gradle
 
 sudo add-apt-repository ppa:cwchien/gradle
 sudo apt-get update
 sudo apt-get install gradle
 
 wget
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/kafka-0.8.2-beta-src.tgz
 tar -xvf kafka-0.8.2-beta-src.tgz
 cd kafka-0.8.2-beta-src/
 gradle
 
 ...
 
 Building project 'core' with Scala version 2.10.1
 :downloadWrapper
 
 BUILD SUCCESSFUL
 
 Total time: 22.066 secs
 
 so it works, I use the source upload to build the binaries. odd.
 
 *
 
 Maybe the gradle version you have is too old or something? I am using
 
 vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle -version
 
 
 Gradle 2.1
 
 
 Build time:   2014-09-08 10:40:39 UTC
 Build number: none
 Revision: e6cf70745ac11fa943e19294d19a2c527a669a53
 
 Groovy:   2.3.6
 Ant:  Apache Ant(TM) version 1.9.3 compiled on December 23 2013
 JVM:  1.6.0_45 (Sun Microsystems Inc. 20.45-b01)
 OS:   Linux 3.2.0-23-generic amd64
 
 
 
 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /
 
 On Thu, Oct 23, 2014 at 12:49 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:
 
  Joe,
 
  I downloaded the source distro and ran into this error. Followed the steps
  listed on the release process wiki to validate the release.
 
  vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle
 
  FAILURE: Build failed with an exception.
 
  * Where:
  Script '/home/vagrant/kafka-0.8.2-beta-src/scala.gradle' line: 2
 
  * What went wrong:
  A problem occurred evaluating script.
  Cause: Could not find property 'ext' on settings 'kafka-0.8.2-beta-src'.
 
  * Try:
  Run with --stacktrace option to get the stack trace. Run with --info or
  --debug option to get more log output.
 
  BUILD FAILED
 
  Total time: 1.565 secs
  vagrant@precise64:~/kafka-0.8.2-beta-src$ cat scala.gradle
  if (!hasProperty('scalaVersion')) {
ext.scalaVersion = '2.10.1'
  }
  ext.defaultScalaVersion = '2.10.1'
  if (scalaVersion.startsWith('2.10')) {
  ext.baseScalaVersion = '2.10'
  } else if (scalaVersion.startsWith('2.11')) {
  ext.baseScalaVersion = '2.11'
  } else {
  ext.baseScalaVersion = scalaVersion
  }
 
  On Tue, Oct 21, 2014 at 1:58 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   This is the first candidate for release of Apache Kafka 0.8.2-beta
  
   Release Notes for the 0.8.2-beta release
  
  
  https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html
  
   *** Please download, test and vote by Friday, October 24th, 2pm PT
  
   Kafka's KEYS file containing PGP keys we use to sign the release:
   https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1
   and sha2 (SHA256) checksum.
  
   * Release artifacts to be voted upon (source and binary):
   https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/
  
   * Maven artifacts to be voted upon prior to release:
   https://repository.apache.org/content/groups/staging/
  
   * scala-doc
  
  https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/
  
   * java-doc
  
  https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/
  
   * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag
  
  
  https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0
  
   /***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
   /
  
 


[jira] [Resolved] (KAFKA-1727) Fix comment about message format

2014-10-23 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1727.

   Resolution: Fixed
Fix Version/s: 0.8.3
 Assignee: Muneyuki Noguchi

Thanks for the patch. +1 and committed to trunk.

 Fix comment about message format
 

 Key: KAFKA-1727
 URL: https://issues.apache.org/jira/browse/KAFKA-1727
 Project: Kafka
  Issue Type: Bug
Reporter: Muneyuki Noguchi
Assignee: Muneyuki Noguchi
Priority: Trivial
 Fix For: 0.8.3

 Attachments: KAFKA-1727.patch


 The comment in Message.scala says the value of magic identifier is 2,
  bq. 2. 1 byte magic identifier to allow format changes, value is 2 
 currently
 but the actual current magic value is 0.
 {code}
   /**
* The current magic value
*/
   val CurrentMagicValue: Byte = 0
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182057#comment-14182057
 ] 

Jun Rao commented on KAFKA-391:
---

The client should always receive one response per request. Are you see 
otherwise?

 Producer request and response classes should use maps
 -

 Key: KAFKA-391
 URL: https://issues.apache.org/jira/browse/KAFKA-391
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Joel Koshy
Priority: Blocker
  Labels: optimization
 Fix For: 0.8.0

 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, 
 KAFKA-391-v3.patch, KAFKA-391-v4.patch


 Producer response contains two arrays of error codes and offsets - the 
 ordering in these arrays correspond to the flattened ordering of the request 
 arrays.
 It would be better to switch to maps in the request and response as this 
 would make the code clearer and more efficient (right now, linear scans are 
 used in handling producer acks).
 We can probably do the same in the fetch request/response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182070#comment-14182070
 ] 

Jun Rao commented on KAFKA-1716:


Chris,

Did you change fetch.wait.max.ms in the consumer config? Thanks,

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede

 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
 at 
 scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka/utils/Utils$.inLock(Utils.scala:538)
 at 
 kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at 
 kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
 at 

Re: Review Request 26390: Fix KAFKA-1641

2014-10-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26390/#review58117
---

Ship it!


- Joel Koshy


On Oct. 9, 2014, 8:04 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26390/
 ---
 
 (Updated Oct. 9, 2014, 8:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1641
 https://issues.apache.org/jira/browse/KAFKA-1641
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Reset cleaning start offset upon abnormal log truncation
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/LogCleanerManager.scala 
 e8ced6a5922508ea3274905be7c3d6e728f320ac 
 
 Diff: https://reviews.apache.org/r/26390/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 26390: Fix KAFKA-1641

2014-10-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26390/#review58144
---



core/src/main/scala/kafka/log/LogCleanerManager.scala
https://reviews.apache.org/r/26390/#comment99062

Actually, because of the + the format specifiers are unmatched.

I will fix this on check-in.


- Joel Koshy


On Oct. 9, 2014, 8:04 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26390/
 ---
 
 (Updated Oct. 9, 2014, 8:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1641
 https://issues.apache.org/jira/browse/KAFKA-1641
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Reset cleaning start offset upon abnormal log truncation
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/LogCleanerManager.scala 
 e8ced6a5922508ea3274905be7c3d6e728f320ac 
 
 Diff: https://reviews.apache.org/r/26390/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




[jira] [Commented] (KAFKA-1695) Authenticate connection to Zookeeper

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182090#comment-14182090
 ] 

Jun Rao commented on KAFKA-1695:


Option 2 is probably too big a change. Let's see if we can have the change 
incorporated to ZKClient first. I can ping Stefan too if needed.

 Authenticate connection to Zookeeper
 

 Key: KAFKA-1695
 URL: https://issues.apache.org/jira/browse/KAFKA-1695
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Gwen Shapira

 We need to make it possible to secure the Zookeeper cluster Kafka is using. 
 This would make use of the normal authentication ZooKeeper provides. 
 ZooKeeper supports a variety of authentication mechanisms so we will need to 
 figure out what has to be passed in to the zookeeper client.
 The intention is that when the current round of client work is done it should 
 be possible to run without clients needing access to Zookeeper so all we need 
 here is to make it so that only the Kafka cluster is able to read and write 
 to the Kafka znodes  (we shouldn't need to set any kind of acl on a per-znode 
 basis).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1641) Log cleaner exits if last cleaned offset is lower than earliest offset

2014-10-23 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1641:
--
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the patch.

Pushed to trunk (after fixing the log message issue I noted on the RB)

 Log cleaner exits if last cleaned offset is lower than earliest offset
 --

 Key: KAFKA-1641
 URL: https://issues.apache.org/jira/browse/KAFKA-1641
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Joel Koshy
Assignee: Guozhang Wang
 Attachments: KAFKA-1641.patch, KAFKA-1641_2014-10-09_13:04:15.patch


 Encountered this recently: the log cleaner exited a while ago (I think 
 because the topic had compressed messages). That issue was subsequently 
 addressed by having the producer only send uncompressed. However, on a 
 subsequent restart of the broker we see this:
 In this scenario I think it is reasonable to just emit a warning and have the 
 cleaner round up its first dirty offset to the base offset of the first 
 segment.
 {code}
 [kafka-server] [] [kafka-log-cleaner-thread-0], Error due to 
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 54770438 but segment base offset is 382844024 for log testtopic-0.
 at scala.Predef$.require(Predef.scala:145)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:491)
 at kafka.log.Cleaner.clean(LogCleaner.scala:288)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:202)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:187)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps

2014-10-23 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182097#comment-14182097
 ] 

Honghai Chen commented on KAFKA-391:


Yes,  after add more information to the error message ,  sometimes see two 
response for one request.

 Producer request and response classes should use maps
 -

 Key: KAFKA-391
 URL: https://issues.apache.org/jira/browse/KAFKA-391
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Joel Koshy
Priority: Blocker
  Labels: optimization
 Fix For: 0.8.0

 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, 
 KAFKA-391-v3.patch, KAFKA-391-v4.patch


 Producer response contains two arrays of error codes and offsets - the 
 ordering in these arrays correspond to the flattened ordering of the request 
 arrays.
 It would be better to switch to maps in the request and response as this 
 would make the code clearer and more efficient (right now, linear scans are 
 used in handling producer acks).
 We can probably do the same in the fetch request/response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2) a restful producer API

2014-10-23 Thread Anand Iyer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182111#comment-14182111
 ] 

Anand Iyer commented on KAFKA-2:


Since the Kafka APIs are undergoing changes in upcoming versions, this JIRA 
becomes more important. Kafka customers are asking for it. This is important 
for increased adoption of Kafka.

 a restful producer API
 --

 Key: KAFKA-2
 URL: https://issues.apache.org/jira/browse/KAFKA-2
 Project: Kafka
  Issue Type: Improvement
Priority: Minor

 If Kafka server supports a restful producer API, we can use Kafka in any 
 programming language without implementing the wire protocol in each language.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182117#comment-14182117
 ] 

Jun Rao commented on KAFKA-1471:


Ewen,

It seems that the necessary changes are already included in kafka-1493. Could 
we just resolve this jira?

 Add Producer Unit Tests for LZ4 and LZ4HC compression
 -

 Key: KAFKA-1471
 URL: https://issues.apache.org/jira/browse/KAFKA-1471
 Project: Kafka
  Issue Type: Sub-task
Reporter: James Oliver
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3

 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, 
 KAFKA-1471_2014-10-12_16:02:19.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1695) Authenticate connection to Zookeeper

2014-10-23 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182123#comment-14182123
 ] 

Gwen Shapira commented on KAFKA-1695:
-

Agree! I do not want to change everything that uses ZKClient.

If you contact Stefan to see if we can get the ACL patch in, that will be 
fantastic. I'm willing to help with the pull request if needed.

 Authenticate connection to Zookeeper
 

 Key: KAFKA-1695
 URL: https://issues.apache.org/jira/browse/KAFKA-1695
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Gwen Shapira

 We need to make it possible to secure the Zookeeper cluster Kafka is using. 
 This would make use of the normal authentication ZooKeeper provides. 
 ZooKeeper supports a variety of authentication mechanisms so we will need to 
 figure out what has to be passed in to the zookeeper client.
 The intention is that when the current round of client work is done it should 
 be possible to run without clients needing access to Zookeeper so all we need 
 here is to make it so that only the Kafka cluster is able to read and write 
 to the Kafka znodes  (we shouldn't need to set any kind of acl on a per-znode 
 basis).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression

2014-10-23 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182126#comment-14182126
 ] 

Ewen Cheslack-Postava commented on KAFKA-1471:
--

This was already applied, just never marked resolved.

 Add Producer Unit Tests for LZ4 and LZ4HC compression
 -

 Key: KAFKA-1471
 URL: https://issues.apache.org/jira/browse/KAFKA-1471
 Project: Kafka
  Issue Type: Sub-task
Reporter: James Oliver
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2, 0.8.3

 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, 
 KAFKA-1471_2014-10-12_16:02:19.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression

2014-10-23 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1471:
-
   Resolution: Fixed
Fix Version/s: 0.8.2
   Status: Resolved  (was: Patch Available)

 Add Producer Unit Tests for LZ4 and LZ4HC compression
 -

 Key: KAFKA-1471
 URL: https://issues.apache.org/jira/browse/KAFKA-1471
 Project: Kafka
  Issue Type: Sub-task
Reporter: James Oliver
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2, 0.8.3

 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, 
 KAFKA-1471_2014-10-12_16:02:19.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182127#comment-14182127
 ] 

Jun Rao commented on KAFKA-1721:


Bhavesh,

Do you want to submit a patch to upgrade the snappy jar in Kafka? Thanks,

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava

 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-23 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182130#comment-14182130
 ] 

Ewen Cheslack-Postava commented on KAFKA-1721:
--

I have the trivial patch, but the upstream jar seems to be broken (see the 
earlier Github issue). I'll follow up on this once that issue is resolved.

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava

 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1675) bootstrapping tidy-up

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182138#comment-14182138
 ] 

Jun Rao commented on KAFKA-1675:


Thanks for the patch. Looks good. It doesn't seem to apply any more. Could you 
rebase?

 bootstrapping tidy-up
 -

 Key: KAFKA-1675
 URL: https://issues.apache.org/jira/browse/KAFKA-1675
 Project: Kafka
  Issue Type: Bug
Reporter: Szczepan Faber
Assignee: Ivan Lyutov
 Fix For: 0.8.3

 Attachments: KAFKA-1675.patch


 I'd like to suggest following changes:
 1. remove the 'gradlew' and 'gradlew.bat' scripts from the source tree. Those 
 scripts don't work, e.g. they fail with exception when invoked. I just got a 
 user report where those scripts were invoked by the user and it led to an 
 exception that was not easy to grasp. Bootstrapping step will generate those 
 files anyway.
 2. move the 'gradleVersion' extra property from the 'build.gradle' into 
 'gradle.properties'. Otherwise it is hard to automate the bootstrapping 
 process - in order to find out the gradle version, I need to evaluate the 
 build script, and for that I need gradle with correct version (kind of a 
 vicious circle). Project properties declared in the gradle.properties file 
 can be accessed exactly the same as the 'ext' properties, for example: 
 'project.gradleVersion'.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1700) examples directory - README and shell scripts are out of date

2014-10-23 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1700.

Resolution: Fixed
  Assignee: Geoffrey Anderson

Thanks for the patch. +1 and committed to trunk.

 examples directory - README and shell scripts are out of date
 -

 Key: KAFKA-1700
 URL: https://issues.apache.org/jira/browse/KAFKA-1700
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Geoffrey Anderson
Assignee: Geoffrey Anderson
Priority: Minor
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-1700.patch


 sbt build files were removed during resolution of KAFKA-1254, so the README 
 under the examples directory should no longer make reference to sbt.
 Also, the paths added to CLASSPATH variable in the example shell script are 
 no longer correct.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26885: Patch for KAFKA-1642

2014-10-23 Thread Ewen Cheslack-Postava


 On Oct. 23, 2014, 9:43 p.m., Jun Rao wrote:
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java,
   lines 197-199
  https://reviews.apache.org/r/26885/diff/2/?file=726776#file726776line197
 
  It seems that in this case, the nextReadyCheckDelayMs should be the 
  remaining linger time for tp1, which is lingerMs/2. Should we just assert 
  that?

tp1 and tp2 have the same leader, node1. The test adds enough data to make tp2 
sendable, so in the ideal case only tp3 would be used to determine timeout, 
which should require lingerMs more time. However, the test checks for = 
lingerMs because single scan through the topic partitions means that we can 
incorporate the lingerMs/2 timeout from tp1 even though we determine later that 
we really want to ignore it (and I actually saw this happen when I initially 
wrote the test to check for the exact value). I think the tradeoff of sometimes 
waking up a bit earlier than needed is probably worthwhile since it keeps the 
implementation simpler and cheaper.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review57513
---


On Oct. 21, 2014, 12:34 a.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26885/
 ---
 
 (Updated Oct. 21, 2014, 12:34 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1642
 https://issues.apache.org/jira/browse/KAFKA-1642
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fixes two issues with the computation of ready nodes and poll timeouts in
 Sender/RecordAccumulator:
 
 1. The timeout was computed incorrectly because it took into account all 
 nodes,
 even if they had data to send such that their timeout would be 0. However, 
 nodes
 were then filtered based on whether it was possible to send (i.e. their
 connection was still good) which could result in nothing to send and a 0
 timeout, resulting in busy looping. Instead, the timeout needs to be computed
 only using data that cannot be immediately sent, i.e. where the timeout will 
 be
 greater than 0. This timeout is only used if, after filtering by whether
 connections are ready for sending, there is no data to be sent. Other events 
 can
 wake the thread up earlier, e.g. a client reconnects and becomes ready again.
 
 2. One of the conditions indicating whether data is sendable is whether a
 timeout has expired -- either the linger time or the retry backoff. This
 condition wasn't accounting for both cases properly, always using the linger
 time. This means the retry backoff was probably not being respected.
 
 KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
 none can send data because they are in a connection backoff period.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 d304660f29246e9600efe3ddb28cfcc2b074bed3 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 29658d4a15f112dc0af5ce517eaab93e6f00134b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  c5d470011d334318d5ee801021aadd0c000974a6 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 aae8d4a1e98279470587d397cc779a9baf6fee6c 
   
 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
  0762b35abba0551f23047348c5893bb8c9acff14 
 
 Diff: https://reviews.apache.org/r/26885/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Review Request 26885: Patch for KAFKA-1642

2014-10-23 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/
---

(Updated Oct. 23, 2014, 11:19 p.m.)


Review request for kafka.


Bugs: KAFKA-1642
https://issues.apache.org/jira/browse/KAFKA-1642


Repository: kafka


Description (updated)
---

Fixes two issues with the computation of ready nodes and poll timeouts in
Sender/RecordAccumulator:

1. The timeout was computed incorrectly because it took into account all nodes,
even if they had data to send such that their timeout would be 0. However, nodes
were then filtered based on whether it was possible to send (i.e. their
connection was still good) which could result in nothing to send and a 0
timeout, resulting in busy looping. Instead, the timeout needs to be computed
only using data that cannot be immediately sent, i.e. where the timeout will be
greater than 0. This timeout is only used if, after filtering by whether
connections are ready for sending, there is no data to be sent. Other events can
wake the thread up earlier, e.g. a client reconnects and becomes ready again.

2. One of the conditions indicating whether data is sendable is whether a
timeout has expired -- either the linger time or the retry backoff. This
condition wasn't accounting for both cases properly, always using the linger
time. This means the retry backoff was probably not being respected.

KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
none can send data because they are in a connection backoff period.


Addressing Jun's comments.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
d304660f29246e9600efe3ddb28cfcc2b074bed3 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
29658d4a15f112dc0af5ce517eaab93e6f00134b 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 c5d470011d334318d5ee801021aadd0c000974a6 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
aae8d4a1e98279470587d397cc779a9baf6fee6c 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 0762b35abba0551f23047348c5893bb8c9acff14 

Diff: https://reviews.apache.org/r/26885/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/
---

(Updated Oct. 23, 2014, 11:20 p.m.)


Review request for kafka.


Bugs: KAFKA-1719
https://issues.apache.org/jira/browse/KAFKA-1719


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Neha and Guzhang's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/tools/MirrorMaker.scala 
b8698ee1469c8fbc92ccc176d916eb3e28b87867 

Diff: https://reviews.apache.org/r/26994/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-23 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1642:
-
Attachment: KAFKA-1642_2014-10-23_16:19:41.patch

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.

2014-10-23 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1719:

Attachment: KAFKA-1719_2014-10-23_16:20:22.patch

 Make mirror maker exit when one consumer/producer thread exits.
 ---

 Key: KAFKA-1719
 URL: https://issues.apache.org/jira/browse/KAFKA-1719
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1719.patch, KAFKA-1719_2014-10-22_15:04:32.patch, 
 KAFKA-1719_2014-10-23_16:20:22.patch


 When one of the consumer/producer thread exits, the entire mirror maker will 
 be blocked. In this case, it is better to make it exit. It seems a single 
 ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't 
 need a list for the connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-23 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182165#comment-14182165
 ] 

Ewen Cheslack-Postava commented on KAFKA-1642:
--

Updated reviewboard https://reviews.apache.org/r/26885/diff/
 against branch origin/trunk

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.

2014-10-23 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182166#comment-14182166
 ] 

Jiangjie Qin commented on KAFKA-1719:
-

Updated reviewboard https://reviews.apache.org/r/26994/diff/
 against branch origin/trunk

 Make mirror maker exit when one consumer/producer thread exits.
 ---

 Key: KAFKA-1719
 URL: https://issues.apache.org/jira/browse/KAFKA-1719
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1719.patch, KAFKA-1719_2014-10-22_15:04:32.patch, 
 KAFKA-1719_2014-10-23_16:20:22.patch


 When one of the consumer/producer thread exits, the entire mirror maker will 
 be blocked. In this case, it is better to make it exit. It seems a single 
 ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't 
 need a list for the connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review58168
---

Ship it!


LGTM, one minor thing upon check in.


core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment99094

fatal(Consumer thread failure due to , t)


- Guozhang Wang


On Oct. 23, 2014, 11:20 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 23, 2014, 11:20 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Neha and Guzhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182197#comment-14182197
 ] 

Jun Rao commented on KAFKA-391:
---

Which version of Kafka are you using? Is that easily reproducible?

 Producer request and response classes should use maps
 -

 Key: KAFKA-391
 URL: https://issues.apache.org/jira/browse/KAFKA-391
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Joel Koshy
Priority: Blocker
  Labels: optimization
 Fix For: 0.8.0

 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, 
 KAFKA-391-v3.patch, KAFKA-391-v4.patch


 Producer response contains two arrays of error codes and offsets - the 
 ordering in these arrays correspond to the flattened ordering of the request 
 arrays.
 It would be better to switch to maps in the request and response as this 
 would make the code clearer and more efficient (right now, linear scans are 
 used in handling producer acks).
 We can probably do the same in the fetch request/response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-23 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182198#comment-14182198
 ] 

Bhavesh Mistry commented on KAFKA-1710:
---

[~jkreps],

Sorry to bug you again.  Did you get chance to review the above performance 
number and cost of Sync per thread when Partition is not set and partition set 
to single partition ?  

Thanks,

Bhavesh 

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-55 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 Thanks,
 Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review58171
---

Ship it!


Besides the minor stylistic comment, rest looks good.


core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment99103

Minor stylistic comment-

if(!isShuttingdown.get())



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment99104

ditto here


- Neha Narkhede


On Oct. 23, 2014, 11:20 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 23, 2014, 11:20 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Neha and Guzhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2014-10-23 Thread Chris Richardson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182203#comment-14182203
 ] 

Chris Richardson commented on KAFKA-1716:
-

No. I am using the default.

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede

 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
 at 
 scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka/utils/Utils$.inLock(Utils.scala:538)
 at 
 kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at 
 kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
 at jrockit/vm/RNI.c2java(J)V(Native 

[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182219#comment-14182219
 ] 

Jay Kreps commented on KAFKA-1710:
--

Well so we need to do a very quick lock around the insert into the queue to 
maintain thread safety. With multiple threads competing on a single partition 
this will definitely be slower than if you have multiple partitions. But 
although it is slower it shouldn't be slow.

The data you have gives 2666 bytes/us, isn't that pretty good? That is 
2.6GB/second, no?

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-55 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 Thanks,
 Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: ConsumerFetcherThread deadlock?

2014-10-23 Thread Guozhang Wang
Jack,

The fetchers are blocked on the queue since it is full, is your consumer
iterator stopped and hence not getting more data from it?

Guozhang

On Thu, Oct 23, 2014 at 3:53 PM, Jack Foy j...@whitepages.com wrote:

 Hi all,

 We run kafka 0.8.1.1. We’re tracking down a problem where consumer groups
 stop pulling from their respective partitions a few minutes or hours into
 execution. It looks like all ConsumerFetcherThreads associated with that
 consumer are blocking while waiting to write data to a LinkedBlockingQueue.
 They are waiting on ConditionObjects with different object IDs, and those
 object IDs do not occur elsewhere within our snapshot of thread data. It
 appears that those threads never make progress once they enter this waiting
 state.

 KAFKA-937 looks like a very similar symptom:
 https://issues.apache.org/jira/browse/KAFKA-937 According to Jun Rao’s
 comments on that issue, a ConsumerFetcherThread should never be blocked. Is
 that still the case?

 Here’s the thread dump for the relevant threads. I can provide more
 information if needed.

 ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-0
 prio=10 tid=0x7f1954a9c800 nid=0xbf0 waiting on condition
 [0x7f19339f8000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x8ac24dd8 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at
 kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at
 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka.utils.Utils$.inLock(Utils.scala:538)
 at
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

 ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-1
 prio=10 tid=0x7f1955657000 nid=0xbf3 waiting on condition
 [0x7f19321e]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x8ad280e8 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at
 kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at
 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka.utils.Utils$.inLock(Utils.scala:538)
 at
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

 ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-2
 prio=10 tid=0x7f1954001000 nid=0xbf1 waiting on condition
 [0x7f19326e5000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)

[jira] [Resolved] (KAFKA-1711) WARN Property topic is not valid when running console producer

2014-10-23 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1711.

   Resolution: Fixed
Fix Version/s: 0.8.3
 Assignee: Joe Crobak

Thanks for the patch. Committed to trunk.

 WARN Property topic is not valid when running console producer
 --

 Key: KAFKA-1711
 URL: https://issues.apache.org/jira/browse/KAFKA-1711
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Joe Crobak
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-1711.patch


 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
 [2014-10-17 09:54:23,984] WARN Property topic is not valid 
 (kafka.utils.VerifiableProperties)
 It would be good if we can get rid of the warning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: ConsumerFetcherThread deadlock?

2014-10-23 Thread Neha Narkhede
This can also happen if at least one of the consumer threads (not the
fetcher threads) die. You can inspect the thread dump to see if all your
consumer threads are alive.

On Thu, Oct 23, 2014 at 5:05 PM, Guozhang Wang wangg...@gmail.com wrote:

 Jack,

 The fetchers are blocked on the queue since it is full, is your consumer
 iterator stopped and hence not getting more data from it?

 Guozhang

 On Thu, Oct 23, 2014 at 3:53 PM, Jack Foy j...@whitepages.com wrote:

  Hi all,
 
  We run kafka 0.8.1.1. We’re tracking down a problem where consumer groups
  stop pulling from their respective partitions a few minutes or hours into
  execution. It looks like all ConsumerFetcherThreads associated with that
  consumer are blocking while waiting to write data to a
 LinkedBlockingQueue.
  They are waiting on ConditionObjects with different object IDs, and those
  object IDs do not occur elsewhere within our snapshot of thread data. It
  appears that those threads never make progress once they enter this
 waiting
  state.
 
  KAFKA-937 looks like a very similar symptom:
  https://issues.apache.org/jira/browse/KAFKA-937 According to Jun Rao’s
  comments on that issue, a ConsumerFetcherThread should never be blocked.
 Is
  that still the case?
 
  Here’s the thread dump for the relevant threads. I can provide more
  information if needed.
 
 
 ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-0
  prio=10 tid=0x7f1954a9c800 nid=0xbf0 waiting on condition
  [0x7f19339f8000]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  0x8ac24dd8 (a
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
  at
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
  at
 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
  at
  kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
  at
 
 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
  at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
  at kafka.utils.Utils$.inLock(Utils.scala:538)
  at
 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
  at
  kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
 
 
 ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-1
  prio=10 tid=0x7f1955657000 nid=0xbf3 waiting on condition
  [0x7f19321e]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  0x8ad280e8 (a
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
  at
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
  at
 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
  at
  kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
  at
 
 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
  at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
  at kafka.utils.Utils$.inLock(Utils.scala:538)
  at
 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
  at
  

Re: Bad link on the document

2014-10-23 Thread Jun Rao
Sorry for the delay. Fixed the link.

Thanks,

Jun

On Tue, Oct 14, 2014 at 8:18 PM, Azuryy Yu azury...@gmail.com wrote:

 Hi,
 Usage information on the hadoop consumer can be found here
 https://github.com/linkedin/camus/tree/camus-kafka-0.8/.

 the Link is broken, who can kindly fix it, thanks



[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-23 Thread Chris Cope (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182233#comment-14182233
 ] 

Chris Cope commented on KAFKA-1501:
---

Unfortunately same issues, 9/100 tests had a bunch of failures

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps

2014-10-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182240#comment-14182240
 ] 

Joel Koshy commented on KAFKA-391:
--

You mean two responses to the caller of send? I don't see why those two lines 
would cause two responses. Can you explain further and provide steps to 
reproduce if there really is an issue? Do you see errors/warns in the logs?

 Producer request and response classes should use maps
 -

 Key: KAFKA-391
 URL: https://issues.apache.org/jira/browse/KAFKA-391
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Joel Koshy
Priority: Blocker
  Labels: optimization
 Fix For: 0.8.0

 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, 
 KAFKA-391-v3.patch, KAFKA-391-v4.patch


 Producer response contains two arrays of error codes and offsets - the 
 ordering in these arrays correspond to the flattened ordering of the request 
 arrays.
 It would be better to switch to maps in the request and response as this 
 would make the code clearer and more efficient (right now, linear scans are 
 used in handling producer acks).
 We can probably do the same in the fetch request/response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: ConsumerFetcherThread deadlock?

2014-10-23 Thread Jack Foy
On Oct 23, 2014, at 5:09 PM, Neha Narkhede neha.narkh...@gmail.com wrote:
 This can also happen if at least one of the consumer threads (not the
 fetcher threads) die. You can inspect the thread dump to see if all your
 consumer threads are alive.

Thanks very much — that’s probably what is happening, and we’ll investigate 
further.

-- 
Jack Foy j...@whitepages.com





Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review58185
---

Ship it!


Looks good - can you upload an updated RB incorporating the minor comments?


core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment99129

exited


- Joel Koshy


On Oct. 23, 2014, 11:20 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 23, 2014, 11:20 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Neha and Guzhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Commented] (KAFKA-1667) topic-level configuration not validated

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182253#comment-14182253
 ] 

Jun Rao commented on KAFKA-1667:


The server side already has a dependency on clients. So, we can start using 
ConfigDef on the broker side as well.

  topic-level configuration not validated
 

 Key: KAFKA-1667
 URL: https://issues.apache.org/jira/browse/KAFKA-1667
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
  Labels: newbie

 I was able to set the configuration for a topic to these invalid values:
 {code}
 Topic:topic-config-test  PartitionCount:1ReplicationFactor:2 
 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol
 {code}
 It seems that the values are saved as long as they are the correct type, but 
 are not validated like the corresponding broker-level properties.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2014-10-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182258#comment-14182258
 ] 

Jun Rao commented on KAFKA-1716:


Hmm, then it's not clear why SimpleConsumer is blocked on reading from the 
socket. Could you turn on the request log on the broker and see if the fetch 
request just takes long or it never completes?

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede

 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
 at 
 scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka/utils/Utils$.inLock(Utils.scala:538)
 at 
 kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at 
 

Re: Review Request 27060: Patch for KAFKA-1725

2014-10-23 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27060/#review58189
---

Ship it!


Ship It!

- Neha Narkhede


On Oct. 22, 2014, 11:52 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27060/
 ---
 
 (Updated Oct. 22, 2014, 11:52 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1725
 https://issues.apache.org/jira/browse/KAFKA-1725
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1725: Clean up system test output: fix typo in system test case file, 
 incorrectly named system test configuration files, and skip trying to 
 generate metrics graphs when no data is available.
 
 
 Diffs
 -
 
   
 system_test/mirror_maker_testsuite/testcase_15001/testcase_5001_properties.json
   
   
 system_test/mirror_maker_testsuite/testcase_15002/testcase_5002_properties.json
   
   
 system_test/mirror_maker_testsuite/testcase_15003/testcase_5003_properties.json
   
   
 system_test/mirror_maker_testsuite/testcase_15004/testcase_5004_properties.json
   
   
 system_test/mirror_maker_testsuite/testcase_15005/testcase_5005_properties.json
   
   
 system_test/mirror_maker_testsuite/testcase_15006/testcase_5006_properties.json
   
   
 system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json 
 308f1937bbdc0fdcebdb8e9bc39e643c3f0c18be 
   
 system_test/replication_testsuite/testcase_10101/testcase_0101_properties.json
   
   
 system_test/replication_testsuite/testcase_10102/testcase_0102_properties.json
   
   
 system_test/replication_testsuite/testcase_10103/testcase_0103_properties.json
   
   
 system_test/replication_testsuite/testcase_10104/testcase_0104_properties.json
   
   
 system_test/replication_testsuite/testcase_10105/testcase_0105_properties.json
   
   
 system_test/replication_testsuite/testcase_10106/testcase_0106_properties.json
   
   
 system_test/replication_testsuite/testcase_10107/testcase_0107_properties.json
   
   
 system_test/replication_testsuite/testcase_10108/testcase_0108_properties.json
   
   
 system_test/replication_testsuite/testcase_10109/testcase_0109_properties.json
   
   
 system_test/replication_testsuite/testcase_10110/testcase_0110_properties.json
   
   
 system_test/replication_testsuite/testcase_10131/testcase_0131_properties.json
   
   
 system_test/replication_testsuite/testcase_10132/testcase_0132_properties.json
   
   
 system_test/replication_testsuite/testcase_10133/testcase_0133_properties.json
   
   
 system_test/replication_testsuite/testcase_10134/testcase_0134_properties.json
   
   system_test/utils/metrics.py d98d3cdeab00be9ddf4b7032a68da3886e4850c7 
 
 Diff: https://reviews.apache.org/r/27060/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps

2014-10-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182276#comment-14182276
 ] 

Joel Koshy commented on KAFKA-391:
--

Yeah I see your point. That's interesting - I actually don't remember why that 
was added but it appears there must have been a legitimate reason (since you 
ran into it:)  ).

Since you are able to reproduce it can you actually print the full original 
request and response itself? It should be in the exception that is thrown.

Also, what is your broker Kafka version? Also, what is the version of the 
producer? Is it the same?


 Producer request and response classes should use maps
 -

 Key: KAFKA-391
 URL: https://issues.apache.org/jira/browse/KAFKA-391
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Joel Koshy
Priority: Blocker
  Labels: optimization
 Fix For: 0.8.0

 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, 
 KAFKA-391-v3.patch, KAFKA-391-v4.patch


 Producer response contains two arrays of error codes and offsets - the 
 ordering in these arrays correspond to the flattened ordering of the request 
 arrays.
 It would be better to switch to maps in the request and response as this 
 would make the code clearer and more efficient (right now, linear scans are 
 used in handling producer acks).
 We can probably do the same in the fetch request/response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-23 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1721:
-
Attachment: KAFKA-1721.patch

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1721.patch


 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-23 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182280#comment-14182280
 ] 

Ewen Cheslack-Postava commented on KAFKA-1721:
--

Created reviewboard https://reviews.apache.org/r/27124/diff/
 against branch origin/trunk

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1721.patch


 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-23 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1721:
-
Status: Patch Available  (was: Open)

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1721.patch


 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 27124: Patch for KAFKA-1721

2014-10-23 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27124/
---

Review request for kafka.


Bugs: KAFKA-1721
https://issues.apache.org/jira/browse/KAFKA-1721


Repository: kafka


Description
---

KAFKA-1721 Bump snappy-java version for thread-safety fix.


Diffs
-

  build.gradle c3e6bb839ad65c512c9db4695d2bb49b82c80da5 

Diff: https://reviews.apache.org/r/27124/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2014-10-23 Thread Chris Richardson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182295#comment-14182295
 ] 

Chris Richardson commented on KAFKA-1716:
-

I just realized that I left out the thread state for the ConsumerFetcherThread:

{quote}
  java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:950)
- locked 0x0006c2dadab8 (a java.lang.Object)
at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
- locked 0x0006c2dada88 (a java.lang.Object)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
- locked 0x0006c2dadbd0 (a 
sun.nio.ch.SocketAdaptor$SocketInputStream)
at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
- locked 0x0006c2dae460 (a java.lang.Object)
{quote}

All of the ConsumerFetcherThreads are in the RUNNABLE state yet as far as I can 
tell only a small percentage of my consumers ever shutdown.

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede

 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
  

Jenkins build is back to normal : Kafka-trunk #316

2014-10-23 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/316/changes



Re: [VOTE] 0.8.2-beta Release Candidate 1

2014-10-23 Thread Jun Rao
Joe,

Verified quickstart on both the src and binary release. They all look good.
The javadoc doesn't seem to include those in clients. Could you add them?

Thanks,

Jun

On Tue, Oct 21, 2014 at 1:58 PM, Joe Stein joe.st...@stealth.ly wrote:

 This is the first candidate for release of Apache Kafka 0.8.2-beta

 Release Notes for the 0.8.2-beta release

 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html

 *** Please download, test and vote by Friday, October 24th, 2pm PT

 Kafka's KEYS file containing PGP keys we use to sign the release:
 https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1
 and sha2 (SHA256) checksum.

 * Release artifacts to be voted upon (source and binary):
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/

 * Maven artifacts to be voted upon prior to release:
 https://repository.apache.org/content/groups/staging/

 * scala-doc
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/

 * java-doc
 https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/

 * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag

 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /