[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2014-10-20 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176624#comment-14176624
 ] 

Guozhang Wang commented on KAFKA-1716:
--

This may be resolved in KAFKA-1305. Did you produce this issue from trunk?

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede

 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
 at 
 scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka/utils/Utils$.inLock(Utils.scala:538)
 at 
 kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at 
 kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
 

Re: [DISCUSSION] Message Metadata

2014-10-20 Thread Guozhang Wang
Thanks for the detailed comments Jun! Some replies inlined.

On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote:

 Hi, Guozhang,

 Thanks for the writeup.

 A few high level comments.

 1. Associating (versioned) schemas to a topic can be a good thing overall.
 Yes, this could add a bit more management overhead in Kafka. However, it
 makes sure that the data format contract between a producer and a consumer
 is kept and managed in a central place, instead of in the application. The
 latter is probably easier to start with, but is likely to be brittle in the
 long run.


I am actually not proposing to not support associated versioned schemas for
topics, but to not let some core Kafka functionalities like auditing being
depend on schemas. I think this alone can separate the schema management
from Kafka piping management (i.e. making sure every single message is
delivered, and within some latency, etc). Adding additional auditing info
into an existing schema will force Kafka to be aware of the schema systems
(Avro, JSON, etc).



 2. Auditing can be a general feature that's useful for many applications.
 Such a feature can be implemented by extending the low level message format
 with a header. However, it can also be added as part of the schema
 management. For example, you can imagine a type of audited schema that adds
 additional auditing info to an existing schema automatically. Performance
 wise, it probably doesn't make a big difference whether the auditing info
 is added in the message header or the schema header.


See replies above.


 3. We talked about avoiding the overhead of decompressing in both the
 broker and the mirror maker. We probably need to think through how this
 works with auditing. In the more general case where you want to audit every
 message, one has to do the decompression to get the individual message,
 independent of how the auditing info is stored. This means that if we want
 to audit the broker directly or the consumer in mirror maker, we have to
 pay the decompression cost anyway. Similarly, if we want to extend mirror
 maker to support some customized filtering/transformation logic, we also
 have to pay the decompression cost.


I see your point. For that I would prefer to have a MM implementation that
is able to do de-compress / re-compress ONLY if required, for example by
auditing, etc. I agree that we have not thought through whether we should
enable auditing on MM, and if yes how to do that, and we could discuss
about that in a different thread. Overall, this proposal is not just for
tackling de-compression on MM but about the feasibility of extending Kafka
message header for system properties / app properties.


 Some low level comments.

 4. Broker offset reassignment (kafka-527):  This probably can be done with
 just a format change on the compressed message set.

 That is true. As I mentioned in the wiki each of the problems may be
resolvable separately but I am thinking about a general way to get all of
them.


 5. MirrorMaker refactoring: We probably can think through how general we
 want mirror maker to be. If we want to it to be more general, we likely
 need to decompress every message just like in a normal consumer. There will
 definitely be overhead. However, as long as mirror maker is made scalable,
 we can overcome the overhead by just running more instances on more
 hardware resources. As for the proposed message format change, we probably
 need to think through it a bit more. The honor-ship flag seems a bit hacky
 to me.


Replied as part of 3). Sure we can discuss more about that, will update the
wiki for collected comments.


 6. Adding a timestamp in each message can be a useful thing. It (1) allows
 log segments to be rolled more accurately; (2) allows finding an offset for
 a particular timestamp more accurately. I am thinking that the timestamp in
 the message should probably be the time when the leader receives the
 message. Followers preserve the timestamp set by leader. To avoid time
 going back during leader change, the leader can probably set the timestamp
 to be the  max of current time and the timestamp of the last message, if
 present. That timestamp can potentially be added to the index file to
 answer offsetBeforeTimestamp queries more efficiently.


Agreed.


 7. Log compaction: It seems that you are suggesting an improvement to
 compact the active segment as well. This can be tricky and we need to
 figure out the details on how to do this. This improvement seems to be
 orthogonal to the message format change though.


I think the improvements is more effective with the timestamps as in 6), we
can discuss more about this.


 8. Data inconsistency from unclean election: I am not sure if we need  to
 add a controlled message to the log during leadership change. The leader
 generation, starting offset map can be maintained in a separate checkpoint
 file. The follower just need to get that map from the leader during
 startup.

 

[jira] [Updated] (KAFKA-1712) Excessive storage usage on newly added node

2014-10-20 Thread Oleg Golovin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Golovin updated KAFKA-1712:

Description: 
When a new node is added to cluster data starts replicating into it. The mtime 
of creating segments will be set on the last message being written to them. 
Though the replication is a prolonged process, let's assume (for simplicity of 
explanation) that their mtime is very close to the time when the new node was 
added.

After the replication is done, new data will start to flow into this new node. 
After `log.retention.hours` the amount of data will be 2 * 
daily_amount_of_data_in_kafka_node (first one is the replicated data from other 
nodes when the node was added (let us call it `t1`) and the second is the 
amount of replicated data from other nodes during `t1 + log.retention.hours`). 
So by that time the node will have twice as much data as the other nodes.

This poses a big problem to us as our storage is chosen to fit normal amount of 
data (not twice this amount).

In our particular case it poses another problem. We have an emergency segment 
cleaner which runs in case storage is nearly full (90%). We try to balance the 
amount of data for it not to run to rely solely on kafka internal log deletion, 
but sometimes emergency cleaner runs.
It works this way:
- it gets all kafka segments for the volume
- it filters out last segments of each partition (just to avoid unnecessary 
recreation of last small-size segments)
- it sorts them by segment mtime
- it changes mtime of the first N segements (with the lowest mtime) to 1, so 
they become really really old. Number N is chosen to free specified percentage 
of volume (3% in our case).  Kafka deletes these segments later (as they are 
very old).

Emergency cleaner works very well. Except for the case when the data is 
replicated to the newly added node. 
In this case segment mtime is the time the segment was replicated and does not 
reflect the real creation time of original data stored in this segment.
So in this case kafka emergency cleaner will delete segments with the lowest 
mtime, which may hold the data which is much more recent than the data in other 
segments.
This is not a big problem until we delete the data which hasn't been fully 
consumed.
In this case we loose data and this makes it a big problem.

Is it possible to retain segment mtime during initial replication on a new node?
This will help not to load the new node with the twice as large amount of data 
as other nodes have.

Or maybe there are another ways to sort segments by data creation times (or 
close to data creation time)? (for example if this ticket is implemented 
https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the first 
message from .index). In our case it will help with kafka emergency cleaner, 
which will be deleting really the oldest data.

  was:
When a new node is added to cluster data stars replicating into it. The mtime 
of creating segments will be set on the last message being written to them. 
Though the replication is a prolonged process, let's assume (for simplicity of 
explanation) that their mtime is very close to the time when the new node was 
added.

After the replication is done, new data will start to flow into this new node. 
After `log.retention.hours` the amount of data will be 2 * 
daily_amount_of_data_in_kafka_node (first one is the replicated data from other 
nodes when the node was added (let us call it `t1`) and the second is the 
amount of replicated data from other nodes during `t1 + log.retention.hours`). 
So by that time the node will have twice as much data as the other nodes.

This poses a big problem to us as our storage is chosen to fit normal amount of 
data (not twice this amount).

In our particular case it poses another problem. We have an emergency segment 
cleaner which runs in case storage is nearly full (90%). We try to balance the 
amount of data for it not to run to rely solely on kafka internal log deletion, 
but sometimes emergency cleaner runs.
It works this way:
- it gets all kafka segments for the volume
- it filters out last segments of each partition (just to avoid unnecessary 
recreation of last small-size segments)
- it sorts them by segment mtime
- it changes mtime of the first N segements (with the lowest mtime) to 1, so 
they become really really old. Number N is chosen to free specified percentage 
of volume (3% in our case).  Kafka deletes these segments later (as they are 
very old).

Emergency cleaner works very well. Except for the case when the data is 
replicated to the newly added node. 
In this case segment mtime is the time the segment was replicated and does not 
reflect the real creation time of original data stored in this segment.
So in this case kafka emergency cleaner will delete segments with the lowest 
mtime, which may hold the data which is much more recent than the 

[jira] [Updated] (KAFKA-1712) Excessive storage usage on newly added node

2014-10-20 Thread Oleg Golovin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Golovin updated KAFKA-1712:

Description: 
When a new node is added to cluster data starts replicating into it. The mtime 
of creating segments will be set on the last message being written to them. 
Though the replication is a prolonged process, let's assume (for simplicity of 
explanation) that their mtime is very close to the time when the new node was 
added.

After the replication is done, new data will start to flow into this new node. 
After `log.retention.hours` the amount of data will be 2 * 
daily_amount_of_data_in_kafka_node (first one is the replicated data from other 
nodes when the node was added (let us call it `t1`) and the second is the 
amount of replicated data from other nodes which happened from `t1` to `t1 + 
log.retention.hours`). So by that time the node will have twice as much data as 
the other nodes.

This poses a big problem to us as our storage is chosen to fit normal amount of 
data (not twice this amount).

In our particular case it poses another problem. We have an emergency segment 
cleaner which runs in case storage is nearly full (90%). We try to balance the 
amount of data for it not to run to rely solely on kafka internal log deletion, 
but sometimes emergency cleaner runs.
It works this way:
- it gets all kafka segments for the volume
- it filters out last segments of each partition (just to avoid unnecessary 
recreation of last small-size segments)
- it sorts them by segment mtime
- it changes mtime of the first N segements (with the lowest mtime) to 1, so 
they become really really old. Number N is chosen to free specified percentage 
of volume (3% in our case).  Kafka deletes these segments later (as they are 
very old).

Emergency cleaner works very well. Except for the case when the data is 
replicated to the newly added node. 
In this case segment mtime is the time the segment was replicated and does not 
reflect the real creation time of original data stored in this segment.
So in this case kafka emergency cleaner will delete segments with the lowest 
mtime, which may hold the data which is much more recent than the data in other 
segments.
This is not a big problem until we delete the data which hasn't been fully 
consumed.
In this case we loose data and this makes it a big problem.

Is it possible to retain segment mtime during initial replication on a new node?
This will help not to load the new node with the twice as large amount of data 
as other nodes have.

Or maybe there are another ways to sort segments by data creation times (or 
close to data creation time)? (for example if this ticket is implemented 
https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the first 
message from .index). In our case it will help with kafka emergency cleaner, 
which will be deleting really the oldest data.

  was:
When a new node is added to cluster data starts replicating into it. The mtime 
of creating segments will be set on the last message being written to them. 
Though the replication is a prolonged process, let's assume (for simplicity of 
explanation) that their mtime is very close to the time when the new node was 
added.

After the replication is done, new data will start to flow into this new node. 
After `log.retention.hours` the amount of data will be 2 * 
daily_amount_of_data_in_kafka_node (first one is the replicated data from other 
nodes when the node was added (let us call it `t1`) and the second is the 
amount of replicated data from other nodes during `t1 + log.retention.hours`). 
So by that time the node will have twice as much data as the other nodes.

This poses a big problem to us as our storage is chosen to fit normal amount of 
data (not twice this amount).

In our particular case it poses another problem. We have an emergency segment 
cleaner which runs in case storage is nearly full (90%). We try to balance the 
amount of data for it not to run to rely solely on kafka internal log deletion, 
but sometimes emergency cleaner runs.
It works this way:
- it gets all kafka segments for the volume
- it filters out last segments of each partition (just to avoid unnecessary 
recreation of last small-size segments)
- it sorts them by segment mtime
- it changes mtime of the first N segements (with the lowest mtime) to 1, so 
they become really really old. Number N is chosen to free specified percentage 
of volume (3% in our case).  Kafka deletes these segments later (as they are 
very old).

Emergency cleaner works very well. Except for the case when the data is 
replicated to the newly added node. 
In this case segment mtime is the time the segment was replicated and does not 
reflect the real creation time of original data stored in this segment.
So in this case kafka emergency cleaner will delete segments with the lowest 
mtime, which may hold the data which is much 

[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2014-10-20 Thread Sean Fay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176893#comment-14176893
 ] 

Sean Fay commented on KAFKA-1716:
-

Nope, this was running 0.8.1.1. I'm not very familiar with the code. But it 
looks like KAFKA-1305 changed {{controller.message.queue.size}}, while the 
code in question here is impacted by {{queued.max.message.chunks}}

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede

 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
 at 
 scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka/utils/Utils$.inLock(Utils.scala:538)
 at 
 kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at 
 

[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2014-10-20 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176904#comment-14176904
 ] 

Sriharsha Chintalapani commented on KAFKA-1716:
---

[~sfay] do you have any steps to reproduce this.

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede

 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
 at 
 scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka/utils/Utils$.inLock(Utils.scala:538)
 at 
 kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at 
 kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
 at 

[jira] [Updated] (KAFKA-1171) Gradle build for Kafka

2014-10-20 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1171:
-
Fix Version/s: 0.8.1
   0.8.1.1

 Gradle build for Kafka
 --

 Key: KAFKA-1171
 URL: https://issues.apache.org/jira/browse/KAFKA-1171
 Project: Kafka
  Issue Type: Improvement
  Components: packaging
Affects Versions: 0.8.1, 0.9.0
Reporter: David Arthur
Assignee: David Arthur
Priority: Blocker
 Fix For: 0.8.1, 0.8.1.1

 Attachments: 0001-Adding-basic-Gradle-build.patch, 
 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 
 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 
 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 
 kafka-1171_v10.patch, kafka-1171_v11.patch, kafka-1171_v12.patch, 
 kafka-1171_v13.patch, kafka-1171_v14.patch, kafka-1171_v15.patch, 
 kafka-1171_v6.patch, kafka-1171_v7.patch, kafka-1171_v8.patch, 
 kafka-1171_v9.patch


 We have previously discussed moving away from SBT to an 
 easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a 
 patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted 
 to check out Gradle as well.
 1. https://issues.apache.org/jira/browse/KAFKA-855



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-262) Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own

2014-10-20 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede closed KAFKA-262.
---

 Bug in the consumer rebalancing logic causes one consumer to release 
 partitions that it does not own
 

 Key: KAFKA-262
 URL: https://issues.apache.org/jira/browse/KAFKA-262
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Fix For: 0.7.1

 Attachments: kafka-262-v3.patch, kafka-262.patch

   Original Estimate: 24h
  Remaining Estimate: 24h

 The consumer maintains a cache of topics and partitions it owns along with 
 the fetcher queues corresponding to those. But while releasing partition 
 ownership, this cache is not cleared. This leads the consumer to release a 
 partition that it does not own any more. This can also lead the consumer to 
 commit offsets for partitions that it no longer consumes from. 
 The rebalance operation goes through following steps -
 1. close fetchers
 2. commit offsets
 3. release partition ownership. 
 4. rebalance, add topic, partition and fetcher queues to the topic registry, 
 for all topics that the consumer process currently wants to own. 
 5. If the consumer runs into conflict for one topic or partition, the 
 rebalancing attempt fails, and it goes to step 1.
 Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 
 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0.
 1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2.
 2. c1's release partition ownership and during step 4 (above), fails to 
 rebalance.
 3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 
 and starts consuming data.
 4. c1 starts next rebalancing attempt and during step 3 (above), it releases 
 partition 0-1. During step 4, it owns partition 0-0 again, and starts 
 consuming data.
 5. Effectively, rebalancing has completed successfully, but there is no owner 
 for partition 0-1 registered in Zookeeper.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-1171) Gradle build for Kafka

2014-10-20 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede closed KAFKA-1171.


 Gradle build for Kafka
 --

 Key: KAFKA-1171
 URL: https://issues.apache.org/jira/browse/KAFKA-1171
 Project: Kafka
  Issue Type: Improvement
  Components: packaging
Affects Versions: 0.8.1, 0.9.0
Reporter: David Arthur
Assignee: David Arthur
Priority: Blocker
 Fix For: 0.8.1, 0.8.1.1

 Attachments: 0001-Adding-basic-Gradle-build.patch, 
 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 
 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 
 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 
 kafka-1171_v10.patch, kafka-1171_v11.patch, kafka-1171_v12.patch, 
 kafka-1171_v13.patch, kafka-1171_v14.patch, kafka-1171_v15.patch, 
 kafka-1171_v6.patch, kafka-1171_v7.patch, kafka-1171_v8.patch, 
 kafka-1171_v9.patch


 We have previously discussed moving away from SBT to an 
 easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a 
 patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted 
 to check out Gradle as well.
 1. https://issues.apache.org/jira/browse/KAFKA-855



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1630) ConsumerFetcherThread locked in Tomcat

2014-10-20 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176927#comment-14176927
 ] 

Neha Narkhede commented on KAFKA-1630:
--

The thread dump you attached doesn't suggest threads getting locked, they are 
in the RUNNABLE state and consuming data. Just not as fast since the process is 
probably CPU bound.

 ConsumerFetcherThread locked in Tomcat
 --

 Key: KAFKA-1630
 URL: https://issues.apache.org/jira/browse/KAFKA-1630
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.0
 Environment: linux redhat
Reporter: vijay
Assignee: Neha Narkhede
  Labels: performance
   Original Estimate: 12h
  Remaining Estimate: 12h

 I am using high level consumer API for consuming messages from kafka. 
 ConsumerFetcherThread gets locked. Kindly look in to the below stack trace
 ConsumerFetcherThread-SocialTwitterStream6_172.31.240.136-1410398702143-61a247c3-0-1
  prio=10 tid=0x7f294001e800 nid=0x1677 runnable [0x7f297aae9000]
java.lang.Thread.State: RUNNABLE
   at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
   at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:215)
   at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
   at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
   - locked 0x7f2a7c38eb40 (a sun.nio.ch.Util$1)
   - locked 0x7f2a7c38eb28 (a java.util.Collections$UnmodifiableSet)
   - locked 0x7f2a7c326f20 (a sun.nio.ch.EPollSelectorImpl)
   at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
   at 
 sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:193)
   - locked 0x7f2a7c2163c0 (a java.lang.Object)
   at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
   - locked 0x7f2a7c229950 (a 
 sun.nio.ch.SocketAdaptor$SocketInputStream)
   at 
 java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:200)
   - locked 0x7f2a7c38ea50 (a java.lang.Object)
   at kafka.utils.Utils$.read(Utils.scala:395)
   at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
   at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
   at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
   - locked 0x7f2a7c38e9f0 (a java.lang.Object)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
   at 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
   at 
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1714) more better bootstrapping of the gradle-wrapper.jar

2014-10-20 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1714:
-
Component/s: build

 more better bootstrapping of the gradle-wrapper.jar 
 

 Key: KAFKA-1714
 URL: https://issues.apache.org/jira/browse/KAFKA-1714
 Project: Kafka
  Issue Type: Bug
  Components: build
Reporter: Joe Stein
 Fix For: 0.8.3


 From https://issues.apache.org/jira/browse/KAFKA-1490 we moved out the 
 gradle-wrapper.jar for our source maintenance. This makes builds for folks 
 coming in the first step somewhat problematic.  A bootstrap step is required 
 if this could be somehow incorporated that would be great.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1715) better advertising of the bound and working interface

2014-10-20 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1715:
-
Labels: newbie  (was: )

 better advertising of the bound and working interface
 -

 Key: KAFKA-1715
 URL: https://issues.apache.org/jira/browse/KAFKA-1715
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
  Labels: newbie
 Fix For: 0.8.3


 As part of the auto discovery of brokers and meta data messaging we should 
 try to advertise the interface that is bound and working better. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1714) more better bootstrapping of the gradle-wrapper.jar

2014-10-20 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1714:
-
Affects Version/s: 0.8.2

 more better bootstrapping of the gradle-wrapper.jar 
 

 Key: KAFKA-1714
 URL: https://issues.apache.org/jira/browse/KAFKA-1714
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2
Reporter: Joe Stein
 Fix For: 0.8.3


 From https://issues.apache.org/jira/browse/KAFKA-1490 we moved out the 
 gradle-wrapper.jar for our source maintenance. This makes builds for folks 
 coming in the first step somewhat problematic.  A bootstrap step is required 
 if this could be somehow incorporated that would be great.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2014-10-20 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176944#comment-14176944
 ] 

Neha Narkhede commented on KAFKA-1716:
--

[~sfay] Could you please attach the entire thread dump? I'm not sure KAFKA-1305 
is related since that was a purely broker side issue and didn't touch the 
fetcher threads.

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede

 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
 at 
 scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka/utils/Utils$.inLock(Utils.scala:538)
 at 
 kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at 
 

[jira] [Created] (KAFKA-1717) remove netty dependency through ZK 3.4.x

2014-10-20 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-1717:
--

 Summary: remove netty dependency through ZK 3.4.x
 Key: KAFKA-1717
 URL: https://issues.apache.org/jira/browse/KAFKA-1717
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
Priority: Blocker
 Fix For: 0.8.2


ZK 3.4.x specifies a dependency on netty. However, that dependency is optional 
(ZOOKEEPER-1681). To avoid potential jar conflict, it's better to exclude netty 
dependency from Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1717) remove netty dependency through ZK 3.4.x

2014-10-20 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1717:
---
Status: Patch Available  (was: Open)

 remove netty dependency through ZK 3.4.x
 

 Key: KAFKA-1717
 URL: https://issues.apache.org/jira/browse/KAFKA-1717
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
Priority: Blocker
 Fix For: 0.8.2

 Attachments: kafka-1717.patch


 ZK 3.4.x specifies a dependency on netty. However, that dependency is 
 optional (ZOOKEEPER-1681). To avoid potential jar conflict, it's better to 
 exclude netty dependency from Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1717) remove netty dependency through ZK 3.4.x

2014-10-20 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1717:
---
Attachment: kafka-1717.patch

 remove netty dependency through ZK 3.4.x
 

 Key: KAFKA-1717
 URL: https://issues.apache.org/jira/browse/KAFKA-1717
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
Priority: Blocker
 Fix For: 0.8.2

 Attachments: kafka-1717.patch


 ZK 3.4.x specifies a dependency on netty. However, that dependency is 
 optional (ZOOKEEPER-1681). To avoid potential jar conflict, it's better to 
 exclude netty dependency from Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1717) remove netty dependency through ZK 3.4.x

2014-10-20 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176965#comment-14176965
 ] 

Jun Rao commented on KAFKA-1717:


Created reviewboard https://reviews.apache.org/r/26936/diff/
 against branch origin/trunk

 remove netty dependency through ZK 3.4.x
 

 Key: KAFKA-1717
 URL: https://issues.apache.org/jira/browse/KAFKA-1717
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
Priority: Blocker
 Fix For: 0.8.2

 Attachments: kafka-1717.patch


 ZK 3.4.x specifies a dependency on netty. However, that dependency is 
 optional (ZOOKEEPER-1681). To avoid potential jar conflict, it's better to 
 exclude netty dependency from Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26936: Patch for kafka-1717

2014-10-20 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26936/#review57336
---

Ship it!


+1

- Sriharsha Chintalapani


On Oct. 20, 2014, 3:16 p.m., Jun Rao wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26936/
 ---
 
 (Updated Oct. 20, 2014, 3:16 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: kafka-1717
 https://issues.apache.org/jira/browse/kafka-1717
 
 
 Repository: kafka
 
 
 Description
 ---
 
 remove netty from kafka dependency manually until ZOOKEEPER-1681 is fixed
 
 
 Diffs
 -
 
   build.gradle ee87e0f40ad05ba07763d7fe8a4fe3aaadf524e5 
 
 Diff: https://reviews.apache.org/r/26936/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jun Rao
 




[jira] [Created] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-20 Thread Evan Huus (JIRA)
Evan Huus created KAFKA-1718:


 Summary: Message Size Too Large error when only small messages 
produced with Snappy
 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical


I'm the primary author of the Go bindings, and while I originally received this 
as a bug against my bindings, I'm coming to the conclusion that it's a bug in 
the broker somehow.

Specifically, take a look at the last two kafka packets in the following packet 
capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need 
a trunk build of Wireshark to fully decode the kafka part of the packets).

The produce request contains two partitions on one topic. Each partition has 
one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
message set is a sequential collection of snappy-compressed messages, each 
message of size 46899. When uncompressed, each message contains a message set 
of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.

However, the broker responds to this with a MessageSizeTooLarge error, full 
stacktrace from the broker logs being:
kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which 
exceeds the maximum configured message size of 112.
at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
at kafka.log.Log.append(Log.scala:265)
at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
at 
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
at 
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:695)

Since as far as I can tell none of the sizes in the actual produced packet 
exceed the defined maximum, I can only assume that the broker is miscalculating 
something somewhere and throwing the exception improperly.

---

This issue can be reliably reproduced using an out-of-the-box binary download 
of 0.8.1.1 and the following gist: 
https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the 
`producer-ng` branch of the Sarama library).

---

I am happy to provide any more information you might need, or to do relevant 
experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Message Metadata

2014-10-20 Thread Jun Rao
8. Perhaps I need to see the details of the alternative solution. During
the startup of a follower, in general, it's not enough for the follower to
just see the latest generation of the current leader, since the follower
can be several generations behind. So, if the controlled message only
contains the latest generation, it may not be not enough for the follower
to resolve the discrepancy with the leader. Another thing is that when a
follower starts up, the first thing it needs to do is to figure out how
much to truncate. This needs to happen before the follower can start
fetching. So, there is also a chicken and egg problem. The follower can't
figure out how much to truncate before it can fetch the control message.
However, it doesn't know where to start fetching until the truncation is
properly done.

Thanks,

Jun

On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com wrote:

 Thanks for the detailed comments Jun! Some replies inlined.

 On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote:

  Hi, Guozhang,
 
  Thanks for the writeup.
 
  A few high level comments.
 
  1. Associating (versioned) schemas to a topic can be a good thing
 overall.
  Yes, this could add a bit more management overhead in Kafka. However, it
  makes sure that the data format contract between a producer and a
 consumer
  is kept and managed in a central place, instead of in the application.
 The
  latter is probably easier to start with, but is likely to be brittle in
 the
  long run.
 

 I am actually not proposing to not support associated versioned schemas for
 topics, but to not let some core Kafka functionalities like auditing being
 depend on schemas. I think this alone can separate the schema management
 from Kafka piping management (i.e. making sure every single message is
 delivered, and within some latency, etc). Adding additional auditing info
 into an existing schema will force Kafka to be aware of the schema systems
 (Avro, JSON, etc).


 
  2. Auditing can be a general feature that's useful for many applications.
  Such a feature can be implemented by extending the low level message
 format
  with a header. However, it can also be added as part of the schema
  management. For example, you can imagine a type of audited schema that
 adds
  additional auditing info to an existing schema automatically. Performance
  wise, it probably doesn't make a big difference whether the auditing info
  is added in the message header or the schema header.
 
 
 See replies above.


  3. We talked about avoiding the overhead of decompressing in both the
  broker and the mirror maker. We probably need to think through how this
  works with auditing. In the more general case where you want to audit
 every
  message, one has to do the decompression to get the individual message,
  independent of how the auditing info is stored. This means that if we
 want
  to audit the broker directly or the consumer in mirror maker, we have to
  pay the decompression cost anyway. Similarly, if we want to extend mirror
  maker to support some customized filtering/transformation logic, we also
  have to pay the decompression cost.
 
 
 I see your point. For that I would prefer to have a MM implementation that
 is able to do de-compress / re-compress ONLY if required, for example by
 auditing, etc. I agree that we have not thought through whether we should
 enable auditing on MM, and if yes how to do that, and we could discuss
 about that in a different thread. Overall, this proposal is not just for
 tackling de-compression on MM but about the feasibility of extending Kafka
 message header for system properties / app properties.


  Some low level comments.
 
  4. Broker offset reassignment (kafka-527):  This probably can be done
 with
  just a format change on the compressed message set.
 
  That is true. As I mentioned in the wiki each of the problems may be
 resolvable separately but I am thinking about a general way to get all of
 them.


  5. MirrorMaker refactoring: We probably can think through how general we
  want mirror maker to be. If we want to it to be more general, we likely
  need to decompress every message just like in a normal consumer. There
 will
  definitely be overhead. However, as long as mirror maker is made
 scalable,
  we can overcome the overhead by just running more instances on more
  hardware resources. As for the proposed message format change, we
 probably
  need to think through it a bit more. The honor-ship flag seems a bit
 hacky
  to me.
 
 
 Replied as part of 3). Sure we can discuss more about that, will update the
 wiki for collected comments.


  6. Adding a timestamp in each message can be a useful thing. It (1)
 allows
  log segments to be rolled more accurately; (2) allows finding an offset
 for
  a particular timestamp more accurately. I am thinking that the timestamp
 in
  the message should probably be the time when the leader receives the
  message. Followers preserve the timestamp 

Re: [DISCUSSION] Message Metadata

2014-10-20 Thread Guozhang Wang
I have updated the wiki page incorporating received comments. We can
discuss some more details on:

1. How we want to do audit? Whether we want to have in-built auditing on
brokers or even MMs or use  an audit consumer to fetch all messages from
just brokers.

2. How we can avoid de-/re-compression on brokers and MMs with log
compaction turned on.

3. How we can resolve unclean leader election resulted data inconsistency
with control messages.

Guozhang

On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com wrote:

 Thanks for the detailed comments Jun! Some replies inlined.

 On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote:

 Hi, Guozhang,

 Thanks for the writeup.

 A few high level comments.

 1. Associating (versioned) schemas to a topic can be a good thing overall.
 Yes, this could add a bit more management overhead in Kafka. However, it
 makes sure that the data format contract between a producer and a consumer
 is kept and managed in a central place, instead of in the application. The
 latter is probably easier to start with, but is likely to be brittle in
 the
 long run.


 I am actually not proposing to not support associated versioned schemas
 for topics, but to not let some core Kafka functionalities like auditing
 being depend on schemas. I think this alone can separate the schema
 management from Kafka piping management (i.e. making sure every single
 message is delivered, and within some latency, etc). Adding additional
 auditing info into an existing schema will force Kafka to be aware of the
 schema systems (Avro, JSON, etc).



 2. Auditing can be a general feature that's useful for many applications.
 Such a feature can be implemented by extending the low level message
 format
 with a header. However, it can also be added as part of the schema
 management. For example, you can imagine a type of audited schema that
 adds
 additional auditing info to an existing schema automatically. Performance
 wise, it probably doesn't make a big difference whether the auditing info
 is added in the message header or the schema header.


 See replies above.


 3. We talked about avoiding the overhead of decompressing in both the
 broker and the mirror maker. We probably need to think through how this
 works with auditing. In the more general case where you want to audit
 every
 message, one has to do the decompression to get the individual message,
 independent of how the auditing info is stored. This means that if we want
 to audit the broker directly or the consumer in mirror maker, we have to
 pay the decompression cost anyway. Similarly, if we want to extend mirror
 maker to support some customized filtering/transformation logic, we also
 have to pay the decompression cost.


 I see your point. For that I would prefer to have a MM implementation that
 is able to do de-compress / re-compress ONLY if required, for example by
 auditing, etc. I agree that we have not thought through whether we should
 enable auditing on MM, and if yes how to do that, and we could discuss
 about that in a different thread. Overall, this proposal is not just for
 tackling de-compression on MM but about the feasibility of extending Kafka
 message header for system properties / app properties.


 Some low level comments.

 4. Broker offset reassignment (kafka-527):  This probably can be done with
 just a format change on the compressed message set.

 That is true. As I mentioned in the wiki each of the problems may be
 resolvable separately but I am thinking about a general way to get all of
 them.


 5. MirrorMaker refactoring: We probably can think through how general we
 want mirror maker to be. If we want to it to be more general, we likely
 need to decompress every message just like in a normal consumer. There
 will
 definitely be overhead. However, as long as mirror maker is made scalable,
 we can overcome the overhead by just running more instances on more
 hardware resources. As for the proposed message format change, we probably
 need to think through it a bit more. The honor-ship flag seems a bit hacky
 to me.


 Replied as part of 3). Sure we can discuss more about that, will update
 the wiki for collected comments.


 6. Adding a timestamp in each message can be a useful thing. It (1) allows
 log segments to be rolled more accurately; (2) allows finding an offset
 for
 a particular timestamp more accurately. I am thinking that the timestamp
 in
 the message should probably be the time when the leader receives the
 message. Followers preserve the timestamp set by leader. To avoid time
 going back during leader change, the leader can probably set the timestamp
 to be the  max of current time and the timestamp of the last message, if
 present. That timestamp can potentially be added to the index file to
 answer offsetBeforeTimestamp queries more efficiently.


 Agreed.


 7. Log compaction: It seems that you are suggesting an improvement to
 compact the active segment as well. This can 

Re: Review Request 26936: Patch for kafka-1717

2014-10-20 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26936/#review57357
---

Ship it!


Ship It!

- Neha Narkhede


On Oct. 20, 2014, 3:16 p.m., Jun Rao wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26936/
 ---
 
 (Updated Oct. 20, 2014, 3:16 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: kafka-1717
 https://issues.apache.org/jira/browse/kafka-1717
 
 
 Repository: kafka
 
 
 Description
 ---
 
 remove netty from kafka dependency manually until ZOOKEEPER-1681 is fixed
 
 
 Diffs
 -
 
   build.gradle ee87e0f40ad05ba07763d7fe8a4fe3aaadf524e5 
 
 Diff: https://reviews.apache.org/r/26936/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jun Rao
 




[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-20 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177107#comment-14177107
 ] 

Sriharsha Chintalapani commented on KAFKA-1718:
---

[~eapache] kafka uses ByteBufferMessageSet 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
which adds to each message a LogOverhead 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L54
and in Log compares ByteBufferMessageSet.sizeInBytes with configured message 
size which won't be equal to actual message size sent.


 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-20 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177112#comment-14177112
 ] 

Evan Huus commented on KAFKA-1718:
--

LogOverhead is only 12 bytes; none of the values I produce are within 12 bytes 
of the limit and nowhere near the 1070127 that the broker is reporting.

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.

2014-10-20 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-1719:
---

 Summary: Make mirror maker exit when one consumer/producer thread 
exits.
 Key: KAFKA-1719
 URL: https://issues.apache.org/jira/browse/KAFKA-1719
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin


When one of the consumer/producer thread exits, the entire mirror maker will be 
blocked. In this case, it is better to make it exit. It seems a single 
ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't 
need a list for the connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-20 Thread Vladimir Tretyakov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177149#comment-14177149
 ] 

Vladimir Tretyakov commented on KAFKA-1481:
---

Hi, fuf:), created new patch (for 0.8.2 branch).

re 2.1, 2.3 - didn't change this part because we use this part in our 
monitoring, it is more handy for user to see something like 'localhost:9092' 
instead of '2 (where 2 is broker id)'. For us it is easiest to see/have this 
information directly in mBean.

With this patch mBeanNames look like:
{code}
kafka.producer:type=ProducerTopicMetrics,name=MessagesPerSec,topic=spm_alerts_topic
kafka.producer:type=ProducerTopicMetrics,name=MessagesPerSec,allTopics=true
kafka.log:type=Log,name=LogEndOffset,topic=spm_alerts_topic,partitionId=0
kafka.consumer:type=ZookeeperConsumerConnector,name=FetchQueueSize,timestamp=1413817796508,clientId=af_servers,uuid=9f99df40,groupId=af_servers,topicThreadId=spm_topic,consumerHostName=wawanawna,threadId=0
kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=af_servers,threadName=ConsumerFetcherThread,fetcherId=0,sourceBrokerId=0,groupId=af_servers,consumerHostName=wawanawna,timestamp=1413817796508,uuid=9f99df40,brokerHost=wawanawna,brokerPort=9092
{code}

There is not everything clear for me in code, so I've tried to avoid deep 
refactoring. All tests passed for me locally.

PS: note that if 'value' part is empty (in key-value structure) we will not 
see this part in attributes.
example: key1-value1,key2-null,key3-value3 will be converted to 
key1=value1,key3=value3.







 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-20 Thread Vladimir Tretyakov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Tretyakov updated KAFKA-1481:
--
Attachment: KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1717) remove netty dependency through ZK 3.4.x

2014-10-20 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1717:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the reviews. Committed to trunk and 0.8.2.

 remove netty dependency through ZK 3.4.x
 

 Key: KAFKA-1717
 URL: https://issues.apache.org/jira/browse/KAFKA-1717
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
Priority: Blocker
 Fix For: 0.8.2

 Attachments: kafka-1717.patch


 ZK 3.4.x specifies a dependency on netty. However, that dependency is 
 optional (ZOOKEEPER-1681). To avoid potential jar conflict, it's better to 
 exclude netty dependency from Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-20 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177250#comment-14177250
 ] 

Jun Rao commented on KAFKA-1481:


Thanks for the patch. It doesn't seem to apply. Could you rebase?

git apply -p0 ~/Downloads/KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch 
error: core/src/main/scala/kafka/common/ClientIdTopic.scala: No such file or 
directory
error: patch failed: core/src/main/scala/kafka/server/KafkaApis.scala:285
error: core/src/main/scala/kafka/server/KafkaApis.scala: patch does not apply
error: patch failed: 
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:74
error: core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala: 
patch does not apply
error: patch failed: 
core/src/main/scala/kafka/consumer/PartitionAssignor.scala:101
error: core/src/main/scala/kafka/consumer/PartitionAssignor.scala: patch does 
not apply
error: patch failed: 
core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala:17
error: 
core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala: 
patch does not apply

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-20 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177324#comment-14177324
 ] 

Sriharsha Chintalapani commented on KAFKA-1718:
---

[~eapache] there is additional data being added per message too 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/Message.scala#L172
and 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/Message.scala#L99

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[Java New Producer] Snappy NPE Issue

2014-10-20 Thread Bhavesh Mistry
Hi Kafka Dev,

I am getting following issue with Snappy Library.  I checked code for
Snappy lib it seems to be fine.  Have you guys seen this issue ?

2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
kafka producer I/O thread:
*java.lang.NullPointerException*
at
org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
at
org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:744)


Here is code for Snappy
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
:

153 
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
 *if* (inputBuffer
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
== *null* || (buffer != *null*  buffer.length  inputBuffer
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer))
{


Thanks,

Bhavesh


[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-20 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177351#comment-14177351
 ] 

Evan Huus edited comment on KAFKA-1718 at 10/20/14 7:28 PM:


That additional data is only 26 bytes, and is already included in the numbers I 
put in my original report.


was (Author: eapache):
The numbers I put in my original report do take all of that additional data 
into consideration already.

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-20 Thread Vladimir Tretyakov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177423#comment-14177423
 ] 

Vladimir Tretyakov commented on KAFKA-1481:
---

Hi, thx for quick reply, did rebase, added new patches (1 was created by git, 1 
by IDEA IDE).

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-20 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1555:

Attachment: KAFKA-1555-DOCS.0.patch

Adding documentation patch.

Note that this patch is huge since it added an all-new 0.8.2 documentation.

The relevant parts are:
1. Added sub-secution on Availability and Durability Guarantees under 4.7 
Replication section, explaining the different knobs and trade-offs involved.
2. Added min.isr to topic configs

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555.0.patch, 
 KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, 
 KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, 
 KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-20 Thread Vladimir Tretyakov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Tretyakov updated KAFKA-1481:
--
Attachment: KAFKA-1481_2014-10-20_23-14-35.patch
KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-20 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177478#comment-14177478
 ] 

Jun Rao commented on KAFKA-1718:


Another thing that we do on the broker is to assign a new offset to each 
(uncompressed) message and recompress those messages again. It may be possible 
that because of the newly assigned offsets, the new message set doesn't 
compress as well as before and thus exceeds the limit.

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-20 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177487#comment-14177487
 ] 

Evan Huus commented on KAFKA-1718:
--

That sounds plausible.

1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece 
of backtrace or breakpoint I can check or something?
2. If that is the problem, what is a client supposed to do about it? Leave a 
few KiB spare and hope that that's enough? Is there no way for a client using 
compression to be sure that the broker will actually accept the payload (unless 
presumably the uncompressed payload is already small enough)?

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-20 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177487#comment-14177487
 ] 

Evan Huus edited comment on KAFKA-1718 at 10/20/14 9:23 PM:


That sounds plausible.

1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece 
of backtrace or breakpoint I can check or something?
2. If that is the problem, what is a client supposed to do about it? Leave a 
few KiB spare and hope that that's enough? Is there no way for a client using 
compression to be sure that the broker will actually accept the payload (unless 
presumably the uncompressed payload is already small enough)?

Edit: actually, that can't be it. From my original report When uncompressed, 
each message contains a message set of 999600 bytes. So unless the 
recompression on the broker's end *added* a substantial amount of data (which 
is improbable; the messages were all 0s)...


was (Author: eapache):
That sounds plausible.

1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece 
of backtrace or breakpoint I can check or something?
2. If that is the problem, what is a client supposed to do about it? Leave a 
few KiB spare and hope that that's enough? Is there no way for a client using 
compression to be sure that the broker will actually accept the payload (unless 
presumably the uncompressed payload is already small enough)?

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following 

[jira] [Commented] (KAFKA-1712) Excessive storage usage on newly added node

2014-10-20 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177500#comment-14177500
 ] 

Jun Rao commented on KAFKA-1712:


This is being discussed in 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

 Excessive storage usage on newly added node
 ---

 Key: KAFKA-1712
 URL: https://issues.apache.org/jira/browse/KAFKA-1712
 Project: Kafka
  Issue Type: Bug
Reporter: Oleg Golovin

 When a new node is added to cluster data starts replicating into it. The 
 mtime of creating segments will be set on the last message being written to 
 them. Though the replication is a prolonged process, let's assume (for 
 simplicity of explanation) that their mtime is very close to the time when 
 the new node was added.
 After the replication is done, new data will start to flow into this new 
 node. After `log.retention.hours` the amount of data will be 2 * 
 daily_amount_of_data_in_kafka_node (first one is the replicated data from 
 other nodes when the node was added (let us call it `t1`) and the second is 
 the amount of replicated data from other nodes which happened from `t1` to 
 `t1 + log.retention.hours`). So by that time the node will have twice as much 
 data as the other nodes.
 This poses a big problem to us as our storage is chosen to fit normal amount 
 of data (not twice this amount).
 In our particular case it poses another problem. We have an emergency segment 
 cleaner which runs in case storage is nearly full (90%). We try to balance 
 the amount of data for it not to run to rely solely on kafka internal log 
 deletion, but sometimes emergency cleaner runs.
 It works this way:
 - it gets all kafka segments for the volume
 - it filters out last segments of each partition (just to avoid unnecessary 
 recreation of last small-size segments)
 - it sorts them by segment mtime
 - it changes mtime of the first N segements (with the lowest mtime) to 1, so 
 they become really really old. Number N is chosen to free specified 
 percentage of volume (3% in our case).  Kafka deletes these segments later 
 (as they are very old).
 Emergency cleaner works very well. Except for the case when the data is 
 replicated to the newly added node. 
 In this case segment mtime is the time the segment was replicated and does 
 not reflect the real creation time of original data stored in this segment.
 So in this case kafka emergency cleaner will delete segments with the lowest 
 mtime, which may hold the data which is much more recent than the data in 
 other segments.
 This is not a big problem until we delete the data which hasn't been fully 
 consumed.
 In this case we loose data and this makes it a big problem.
 Is it possible to retain segment mtime during initial replication on a new 
 node?
 This will help not to load the new node with the twice as large amount of 
 data as other nodes have.
 Or maybe there are another ways to sort segments by data creation times (or 
 close to data creation time)? (for example if this ticket is implemented 
 https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the 
 first message from .index). In our case it will help with kafka emergency 
 cleaner, which will be deleting really the oldest data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1514) Update Kafka trunk version number

2014-10-20 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1514.

Resolution: Fixed

 Update Kafka trunk version number
 -

 Key: KAFKA-1514
 URL: https://issues.apache.org/jira/browse/KAFKA-1514
 Project: Kafka
  Issue Type: Bug
Reporter: Jakob Homan

 Right now the 8.1.1 branch has 8.1.1 as its version, while the trunk has 8.1. 
  Trunk should be 9.0, generally, or at the very least 8.2.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1720) [Renaming / Comments] Delayed Operations

2014-10-20 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-1720:


 Summary: [Renaming / Comments] Delayed Operations
 Key: KAFKA-1720
 URL: https://issues.apache.org/jira/browse/KAFKA-1720
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0


After KAFKA-1583 checked in, we would better renaming the delayed requests to 
delayed operations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-20 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177558#comment-14177558
 ] 

Jun Rao commented on KAFKA-1718:


In trunk, we actually do a size check before and after recompression. You can 
probably set a breakpoint in Log.append() and see where the size limit is 
violated.

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-20 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review57490
---


Thanks for the patch. Some comments below.


core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/26373/#comment98208

This doesn't quite fix the original problem though. The original problem is 
that if the leader is not alive, we won't call partition.makeFollower(), in 
which the local replica is created. If a local replica is not created, the 
partition will be ignored when checkingpoint HW and we lose the last 
checkpointed HW.

So, we have to call partition.makerFollower() for every follower, whether 
its leader is live or not. After this, we can proceed with the rest of the 
steps for only those partitions with a live leader. We can log a warning for 
those partitions w/o a live leader.


- Jun Rao


On Oct. 18, 2014, 7:26 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 18, 2014, 7:26 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Joel's comments.
 
 
 the version 2 code seems to be submitted by mistake... This should be the 
 code for review that addressed Joel's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 26885: Patch for KAFKA-1642

2014-10-20 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/
---

(Updated Oct. 21, 2014, 12:34 a.m.)


Review request for kafka.


Bugs: KAFKA-1642
https://issues.apache.org/jira/browse/KAFKA-1642


Repository: kafka


Description (updated)
---

Fixes two issues with the computation of ready nodes and poll timeouts in
Sender/RecordAccumulator:

1. The timeout was computed incorrectly because it took into account all nodes,
even if they had data to send such that their timeout would be 0. However, nodes
were then filtered based on whether it was possible to send (i.e. their
connection was still good) which could result in nothing to send and a 0
timeout, resulting in busy looping. Instead, the timeout needs to be computed
only using data that cannot be immediately sent, i.e. where the timeout will be
greater than 0. This timeout is only used if, after filtering by whether
connections are ready for sending, there is no data to be sent. Other events can
wake the thread up earlier, e.g. a client reconnects and becomes ready again.

2. One of the conditions indicating whether data is sendable is whether a
timeout has expired -- either the linger time or the retry backoff. This
condition wasn't accounting for both cases properly, always using the linger
time. This means the retry backoff was probably not being respected.

KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
none can send data because they are in a connection backoff period.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
d304660f29246e9600efe3ddb28cfcc2b074bed3 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
29658d4a15f112dc0af5ce517eaab93e6f00134b 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 c5d470011d334318d5ee801021aadd0c000974a6 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
aae8d4a1e98279470587d397cc779a9baf6fee6c 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 0762b35abba0551f23047348c5893bb8c9acff14 

Diff: https://reviews.apache.org/r/26885/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-20 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1642:
-
Attachment: KAFKA-1642_2014-10-20_17:33:57.patch

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-20 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177717#comment-14177717
 ] 

Jun Rao commented on KAFKA-1481:


It still doesn't apply.

git apply -p0 ~/Downloads/KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch 
/Users/junrao/Downloads/KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch:311: 
trailing whitespace.
   })  
error: core/src/main/scala/kafka/common/TopicInfo.scala: No such file or 
directory

Does the patch apply on a fresh checkout for you?

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-20 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177723#comment-14177723
 ] 

Ewen Cheslack-Postava commented on KAFKA-1642:
--

To summarize the issues fixed now:
* Fix logic issue with expired in RecordAccumulator.ready
* Don't include nodes that can send data when computing the delay until the 
next check for ready data. Including these doesn't make sense since their 
delays will change when we send data.
* To correctly account for nodes with sendable data, use a timeout of 0 if we 
send any. This guarantees any necessary delay is computed immediately in the 
next round after some current data has been removed.
* Properly account for nodes with sendable data under connection retry backoff. 
Since they weren't included in computing the next check delay when looking up 
ready nodes, we need to account for it later, but only if we conclude the node 
isn't ready. We need to incorporate the amount of backoff time still required 
before a retry will be performed (nothing else would wakeup at the right time, 
unlike other conditions like a full buffer which only change if data is 
received).

It might be possible to break this into smaller commits for each one, but the 
ordering of applying them needs to be careful because some by themselves result 
in bad behavior -- the existing client worked because it often ended up with 
poll timeouts that were much more aggressive (i.e., often 0).

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1698) Validator.ensureValid() only validates default config value

2014-10-20 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1698:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the patch. +1 and committed to trunk.

 Validator.ensureValid() only validates default config value
 ---

 Key: KAFKA-1698
 URL: https://issues.apache.org/jira/browse/KAFKA-1698
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jun Rao
Assignee: Ewen Cheslack-Postava
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1698.patch


 We should use it to validate the actual configured value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1711) WARN Property topic is not valid when running console producer

2014-10-20 Thread Joe Crobak (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Crobak updated KAFKA-1711:
--
Attachment: KAFKA-1711.patch

Here's a patch that fixes the warning.

 WARN Property topic is not valid when running console producer
 --

 Key: KAFKA-1711
 URL: https://issues.apache.org/jira/browse/KAFKA-1711
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1711.patch


 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
 [2014-10-17 09:54:23,984] WARN Property topic is not valid 
 (kafka.utils.VerifiableProperties)
 It would be good if we can get rid of the warning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [Java New Producer] Snappy NPE Issue

2014-10-20 Thread Ewen Cheslack-Postava
I took a quick look at this since I noticed the same issue when testing
your code for the issues you filed. I think the issue is that you're
using all your producers across a thread pool and the snappy library
uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
they may be allocated from the same thread (e.g. one of your MyProducer
classes calls Producer.send() on multiple producers from the same
thread) and therefore use the same BufferRecycler. Eventually you hit
the code in the stacktrace, and if two producer send threads hit it
concurrently they improperly share the unsynchronized BufferRecycler.

This seems like a pain to fix -- it's really a deficiency of the snappy
library and as far as I can see there's no external control over
BufferRecycler in their API. One possibility is to record the thread ID
when we generate a new stream in Compressor and use that to synchronize
access to ensure no concurrent BufferRecycler access. That could be made
specific to snappy so it wouldn't impact other codecs. Not exactly
ideal, but it would work. Unfortunately I can't think of any way for you
to protect against this in your own code since the problem arises in the
producer send thread, which your code should never know about.

Another option would be to setup your producers differently to avoid the
possibility of unsynchronized access from multiple threads (i.e. don't
use the same thread pool approach), but whether you can do that will
depend on your use case.

-Ewen

On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
 Hi Kafka Dev,
 
 I am getting following issue with Snappy Library.  I checked code for
 Snappy lib it seems to be fine.  Have you guys seen this issue ?
 
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at
 org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 
 
 Here is code for Snappy
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
 :
 
 153
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
  *if* (inputBuffer
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
 == *null* || (buffer != *null*  buffer.length  inputBuffer
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer))
 {
 
 
 Thanks,
 
 Bhavesh


[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2014-10-20 Thread Sean Fay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177822#comment-14177822
 ] 

Sean Fay commented on KAFKA-1716:
-

This happens infrequently at system shutdown. I don't have steps to reliably 
reproduce it.

I can't attach the full thread dump, but I can answer questions about it. There 
are two other threads that have the exact same stack trace as the 
ConsumerFetcherThread above. There are no other threads that have kafka classes 
on the stack.

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede

 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
 at 
 scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka/utils/Utils$.inLock(Utils.scala:538)
 at 
 

[jira] [Created] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-20 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-1721:


 Summary: Snappy compressor is not thread safe
 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava


From the mailing list, it can generate this exception:

2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
kafka producer I/O thread:
*java.lang.NullPointerException*
at
org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
at
org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:744)

This appears to be an issue with the snappy-java library using ThreadLocal for 
an internal buffer recycling object which results in that object being shared 
unsafely across threads if one thread sends to multiple producers:

{quote}
I think the issue is that you're
using all your producers across a thread pool and the snappy library
uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
they may be allocated from the same thread (e.g. one of your MyProducer
classes calls Producer.send() on multiple producers from the same
thread) and therefore use the same BufferRecycler. Eventually you hit
the code in the stacktrace, and if two producer send threads hit it
concurrently they improperly share the unsynchronized BufferRecycler.

This seems like a pain to fix -- it's really a deficiency of the snappy
library and as far as I can see there's no external control over
BufferRecycler in their API. One possibility is to record the thread ID
when we generate a new stream in Compressor and use that to synchronize
access to ensure no concurrent BufferRecycler access. That could be made
specific to snappy so it wouldn't impact other codecs. Not exactly
ideal, but it would work. Unfortunately I can't think of any way for you
to protect against this in your own code since the problem arises in the
producer send thread, which your code should never know about.

Another option would be to setup your producers differently to avoid the
possibility of unsynchronized access from multiple threads (i.e. don't
use the same thread pool approach), but whether you can do that will
depend on your use case.
{quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [Java New Producer] Snappy NPE Issue

2014-10-20 Thread Ewen Cheslack-Postava
Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this
since it either requires an updated version of the upstream library, a
workaround by us, or at a bare minimum clear documentation of the issue.

On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote:
 I took a quick look at this since I noticed the same issue when testing
 your code for the issues you filed. I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 
 -Ewen
 
 On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
  Hi Kafka Dev,
  
  I am getting following issue with Snappy Library.  I checked code for
  Snappy lib it seems to be fine.  Have you guys seen this issue ?
  
  2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
  org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
  kafka producer I/O thread:
  *java.lang.NullPointerException*
  at
  org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
  at
  org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
  at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
  at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
  at
  org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
  at
  org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
  at
  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
  at
  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
  at java.lang.Thread.run(Thread.java:744)
  
  
  Here is code for Snappy
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
  :
  
  153
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
   *if* (inputBuffer
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
  == *null* || (buffer != *null*  buffer.length  inputBuffer
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer))
  {
  
  
  Thanks,
  
  Bhavesh


[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-20 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1642:
-
Reviewer: Jay Kreps

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1711) WARN Property topic is not valid when running console producer

2014-10-20 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1711:
-
Reviewer: Jun Rao

 WARN Property topic is not valid when running console producer
 --

 Key: KAFKA-1711
 URL: https://issues.apache.org/jira/browse/KAFKA-1711
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1711.patch


 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
 [2014-10-17 09:54:23,984] WARN Property topic is not valid 
 (kafka.utils.VerifiableProperties)
 It would be good if we can get rid of the warning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Sending Same Message to Two Topics on Same Broker Cluster

2014-10-20 Thread Bhavesh Mistry
Hi Kakfa Team,


I would like to send a single message to multiple topics (two for now)
without re-transmitting the message from producer to brokers.  Is this
possible?

Both Producers Scala and Java does not allow this.   I do not have to do
this all the time only based on application condition.


Thanks in advance of your help !!


Thanks,


Bhavesh


[jira] [Updated] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation

2014-10-20 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1634:
---
Fix Version/s: (was: 0.8.2)
   0.8.3

Moving this to 0.8.3 since it's easier to fix after KAFKA-1583 is done.

 Improve semantics of timestamp in OffsetCommitRequests and update 
 documentation
 ---

 Key: KAFKA-1634
 URL: https://issues.apache.org/jira/browse/KAFKA-1634
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Guozhang Wang
Priority: Blocker
 Fix For: 0.8.3


 From the mailing list -
 following up on this -- I think the online API docs for OffsetCommitRequest
 still incorrectly refer to client-side timestamps:
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest
 Wasn't that removed and now always handled server-side now?  Would one of
 the devs mind updating the API spec wiki?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Sending Same Message to Two Topics on Same Broker Cluster

2014-10-20 Thread Neha Narkhede
Not really. You need producers to send data to Kafka.

On Mon, Oct 20, 2014 at 9:05 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hi Kakfa Team,


 I would like to send a single message to multiple topics (two for now)
 without re-transmitting the message from producer to brokers.  Is this
 possible?

 Both Producers Scala and Java does not allow this.   I do not have to do
 this all the time only based on application condition.


 Thanks in advance of your help !!


 Thanks,


 Bhavesh