[jira] [Resolved] (KAFKA-1551) Configuration example errors

2014-07-20 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1551.
--

Resolution: Fixed

Fixed.

 Configuration example errors
 

 Key: KAFKA-1551
 URL: https://issues.apache.org/jira/browse/KAFKA-1551
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Alexey Ozeritskiy
Assignee: Alexey Ozeritskiy

 A Production Server Config 
 (http://kafka.apache.org/documentation.html#prodconfig) contains error:
 {code}
 # ZK configuration
 zk.connection.timeout.ms=6000
 zk.sync.time.ms=2000
 {code}
 Should be
 {code}
 # ZK configuration
 zookeeper.connection.timeout.ms=6000
 zookeeper.sync.time.ms=2000
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1552) Quickstart refers to wrong port in pastable commands

2014-07-20 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1552:
-

Assignee: Olle Jonsson

 Quickstart refers to wrong port in pastable commands
 

 Key: KAFKA-1552
 URL: https://issues.apache.org/jira/browse/KAFKA-1552
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Olle Jonsson
Assignee: Olle Jonsson
  Labels: documentation, quickstart
 Attachments: quickstart.patch


 The file http://svn.apache.org/repos/asf/kafka/site/081/quickstart.html 
 mentions  the port localhost:218192 which should be localhost:2181.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1552) Quickstart refers to wrong port in pastable commands

2014-07-20 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1552:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Fixed, thanks!

 Quickstart refers to wrong port in pastable commands
 

 Key: KAFKA-1552
 URL: https://issues.apache.org/jira/browse/KAFKA-1552
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Olle Jonsson
Assignee: Olle Jonsson
  Labels: documentation, quickstart
 Attachments: quickstart.patch


 The file http://svn.apache.org/repos/asf/kafka/site/081/quickstart.html 
 mentions  the port localhost:218192 which should be localhost:2181.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1551) Configuration example errors

2014-07-20 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1551:
-

Assignee: Alexey Ozeritskiy

 Configuration example errors
 

 Key: KAFKA-1551
 URL: https://issues.apache.org/jira/browse/KAFKA-1551
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Alexey Ozeritskiy
Assignee: Alexey Ozeritskiy

 A Production Server Config 
 (http://kafka.apache.org/documentation.html#prodconfig) contains error:
 {code}
 # ZK configuration
 zk.connection.timeout.ms=6000
 zk.sync.time.ms=2000
 {code}
 Should be
 {code}
 # ZK configuration
 zookeeper.connection.timeout.ms=6000
 zookeeper.sync.time.ms=2000
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068021#comment-14068021
 ] 

Jay Kreps commented on KAFKA-1414:
--

We should check that having multiple threads accessing the same directory will 
actually have a negative impact.

First, I don't think anyone has actually checked that recovery is actually I/O 
bound. Recovery iterates over the log doing a bunch of small message reads, 
decompressing any compressed messages, and checking the CRC of all the data. 
This may actually be CPU bound.

Secondly the OS will do normal readahead which should help protect somewhat 
against random access for two interspersed linear accesses.

This should be easy to test. Run the perf test on a single node broker with 
multiple partitions on a single drive, then kill -9 it. Then run 
{code}
  echo 1  /proc/sys/vm/drop_caches
{code}
and restart. If we do this twice, once with 1 thread and once with 2. The 
prediction is that the 2 threaded case will be slower than the 1 thread case, 
but it may actually not be.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068173#comment-14068173
 ] 

Jay Kreps commented on KAFKA-1414:
--

I did a test of single-threaded recovery on my laptop where most data is in 
memory and the disk is an SSD. This should remove any I/O bottleneck. I see log 
recovery working at about 275MB/sec with 100% CPU load on one core. This 
indicates that on a non-ssd drive (most Kafka machines, I would imagine), I/O 
would be the bottleneck. 

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, 
 parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068177#comment-14068177
 ] 

Jay Kreps commented on KAFKA-1414:
--

[~ataraxer] I agree that just throwing all the logs at a thread pool is simpler 
to configure and probably also to implement. To see if that will work we need 
to get data on the effect of multiple threads running recovery on a single 
drive will have significant negative perf impacts. Want to try that one out and 
see? If the impact is minor I think we are probably better off with the simpler 
strategy, but if it tanks performance we may need to try to be more explicit 
about the data directories. 

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, 
 parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066439#comment-14066439
 ] 

Jay Kreps commented on KAFKA-1535:
--

To date we have not really used pull requests. Apache has something about JIRA 
being a way to manage copyright assignment. I think recently they may have 
started supporting pull requests, so we should probably document and better 
understand that workflow. But at the moment I think we are still just doing 
patches and JIRA like it is the last century. :-)

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jay Kreps
  Labels: newbie
 Attachments: 
 KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch


 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible

2014-07-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066440#comment-14066440
 ] 

Jay Kreps commented on KAFKA-1150:
--

Yeah, [~thecoop1984], can you verify that the patch on that ticket actually 
fixes the problem you saw?

 Fetch on a replicated topic does not return as soon as possible
 ---

 Key: KAFKA-1150
 URL: https://issues.apache.org/jira/browse/KAFKA-1150
 Project: Kafka
  Issue Type: Bug
  Components: core, replication
Affects Versions: 0.8.0
Reporter: Andrey Balmin
Assignee: Neha Narkhede
 Attachments: Test.java


 I see a huge performance difference between replicated and not replicated 
 topics. On my laptop, running two brokers, I see producer-2-consumer latency 
 of under 1ms for topics with one replica. 
 However,  with two replicas the same latency equals to the max fetch delay. 
 Here is a simple test I just did:
 one producer thread in a loop sending one message and sleeping for 2500ms, 
 and one consumer thread looping on the long poll with max fetch delay of 1000 
 ms.
 Here is what happens with no replication:
 Produced 1 key: key1 at time: 15:33:52.822
 Consumed up to 1 at time: 15:33:52.822
 Consumed up to 1 at time: 15:33:53.823
 Consumed up to 1 at time: 15:33:54.825
 Produced 2 key: key2 at time: 15:33:55.324
 Consumed up to 2 at time: 15:33:55.324
 Consumed up to 2 at time: 15:33:56.326
 Consumed up to 2 at time: 15:33:57.328
 Produced 3 key: key3 at time: 15:33:57.827
 Consumed up to 3 at time: 15:33:57.827
 The are no delays between the message being produced and consumed -- this is 
 the behavior I expected. 
 Here is the same test, but for a topic with two replicas:
 Consumed up to 0 at time: 15:50:29.575
 Produced 1 key: key1 at time: 15:50:29.575
 Consumed up to 1 at time: 15:50:30.577
 Consumed up to 1 at time: 15:50:31.579
 Consumed up to 1 at time: 15:50:32.078
 Produced 2 key: key2 at time: 15:50:32.078
 Consumed up to 2 at time: 15:50:33.081
 Consumed up to 2 at time: 15:50:34.081
 Consumed up to 2 at time: 15:50:34.581
 Produced 3 key: key3 at time: 15:50:34.581
 Consumed up to 3 at time: 15:50:35.584
 Notice how the fetch always returns as soon as the produce request is issued, 
 but without the new message, which consistently arrives ~1002 ms later.
 Below is the request log snippet for this part:
 Produced 2 key: key2 at time: 15:50:32.078
 Consumed up to 2 at time: 15:50:33.081
 You can see the first FetchRequest returns at the same time as the replica 
 FetchRequest, but this fetch response is *empty* -- the message is not 
 committed yet, so it cannot be returned. The message is committed at 
 15:50:32,079. However, the next FetchRequest (that does return the message) 
 comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it 
 waiting for the full 1000 ms, instead of returning right away?
 [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: 
 ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 
 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 
 (kafka.network.RequestChannel$)
 [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
 Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; 
 MinBytes: 1 bytes; RequestInfo: [test_topic,0] - 
 PartitionFetchInfo(129,1024000) from client 
 /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0
  (kafka.request.logger)
 [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
 Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; 
 ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] 
 - PartitionFetchInfo(129,1048576) from client 
 /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0
  (kafka.request.logger)
 [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: 
 FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; 
 MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - 
 PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$)
 [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; 
 Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 
 20 ms; TopicAndPartition: [test_topic,0] - 2078 from client 
 /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0
  (kafka.request.logger)
 [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: 
 FetchRequest; Version: 0; CorrelationId: 3464; ClientId: 
 ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
 RequestInfo: [test_topic,0] - PartitionFetchInfo(130,1048576) 
 

[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector

2014-07-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066461#comment-14066461
 ] 

Jay Kreps commented on KAFKA-1282:
--

The goal is just to reduce server connection count. In our environment there 
might be a single Kafka producer in each process we run publishing to a small 
Kafka cluster (say ~20 servers). However there are tens of thousands of client 
processes. Connections can end up going unused when leadership migrates and we 
should eventually close these out rather than retaining them indefinitely.

As you say it is not critical as the server seems to do a good job of dealing 
with high connection counts, but it seems like a good thing to do.

I agree that doing this on the server might be better. This does mean it is 
possible that the server will attempt to close the socket while the client is 
attempting to send something. But if the timeout is 10 mins, it is unlikely 
that this will happen often (i.e. if nothing was sent in the last 10 mins, it 
will not likely happen in the 0.5 ms it takes to do the close). The advantage 
of doing it on the server is that it will work for all clients.

This change would be in core/.../kafka/network/SocketServer.scala.

The only gotcha is that we likely need to avoid iterating over all connections 
to avoid latency impact (there could be 100k connections). One way to do this 
would be to use java.util.LinkedHashMap to implement an LRU hash map of the 
SelectionKeys, and access this every time the selection key comes up in a 
select operation. (There are a ton of details in LinkedHashMap--needs to be 
access order, etc). Then every 5-10 select loop iterations we would iterate 
the map expiring connections until we come to a connection that doesn't need 
expiring, then stop.

 Disconnect idle socket connection in Selector
 -

 Key: KAFKA-1282
 URL: https://issues.apache.org/jira/browse/KAFKA-1282
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie++
 Fix For: 0.9.0


 To reduce # socket connections, it would be useful for the new producer to 
 close socket connections that are idle. We can introduce a new producer 
 config for the idle time.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1476) Get a list of consumer groups

2014-07-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066471#comment-14066471
 ] 

Jay Kreps commented on KAFKA-1476:
--

This looks good to me.

One thing we should think about: does this make sense as a standalone command? 
We have something of a history of making a new command for every possible thing 
you could do. Often it is a little more usable if we group these together a bit 
into logically related items. For example, I could imagine having a command
   bin/kafka-consumers.sh --list-groups
or something like that that also performed other consumer-related commands. 
This sort of the makes the tools a little more usable for users.

[~guozhang], [~toddpalino], [~nehanarkhede] what do you guys think? As we do 
the consumer co-ordinator what admin operations are we going to want tooling 
for? How does this relate to the consumer offset checker, should that be 
combined? Let's think this through and then just name this thing appropriately 
even if it only has 10% of the functionality we envision at the moment.

 Get a list of consumer groups
 -

 Key: KAFKA-1476
 URL: https://issues.apache.org/jira/browse/KAFKA-1476
 Project: Kafka
  Issue Type: Wish
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Williams
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1476.patch


 It would be useful to have a way to get a list of consumer groups currently 
 active via some tool/script that ships with kafka. This would be helpful so 
 that the system tools can be explored more easily.
 For example, when running the ConsumerOffsetChecker, it requires a group 
 option
 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
 ?
 But, when just getting started with kafka, using the console producer and 
 consumer, it is not clear what value to use for the group option.  If a list 
 of consumer groups could be listed, then it would be clear what value to use.
 Background:
 http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1476) Get a list of consumer groups

2014-07-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066583#comment-14066583
 ] 

Jay Kreps commented on KAFKA-1476:
--

Cool, so [~balaji.seshadri] would you be willing to revamp this a bit? Two 
approaches:
1. Just change the class name to ConsumerCommand and shell script to 
kafka-consumers.sh even though it only implements the list groups functionality 
at the moment. We can file a follow-up bug to implement some of the additional 
features which anyone can pick up.
2. Implement some of the proposed features now on this ticket. Most of them 
should be pretty straight-forward...

Either way is fine.

 Get a list of consumer groups
 -

 Key: KAFKA-1476
 URL: https://issues.apache.org/jira/browse/KAFKA-1476
 Project: Kafka
  Issue Type: Wish
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Williams
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1476.patch


 It would be useful to have a way to get a list of consumer groups currently 
 active via some tool/script that ships with kafka. This would be helpful so 
 that the system tools can be explored more easily.
 For example, when running the ConsumerOffsetChecker, it requires a group 
 option
 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
 ?
 But, when just getting started with kafka, using the console producer and 
 consumer, it is not clear what value to use for the group option.  If a list 
 of consumer groups could be listed, then it would be clear what value to use.
 Background:
 http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067074#comment-14067074
 ] 

Jay Kreps commented on KAFKA-1535:
--

[~nmarasoi] Totally up to you.

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: nicu marasoiu
  Labels: newbie
 Attachments: 
 KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch


 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1543) Changing replication factor

2014-07-17 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065616#comment-14065616
 ] 

Jay Kreps commented on KAFKA-1543:
--

I wonder if it would make more sense to have the alter topic command do this. 
Something like:
{code}
  bin/kafka-topics.sh --zookeeper host:port --alter --topic name 
--replication-factor 3
{code}

 Changing replication factor
 ---

 Key: KAFKA-1543
 URL: https://issues.apache.org/jira/browse/KAFKA-1543
 Project: Kafka
  Issue Type: Improvement
Reporter: Alexey Ozeritskiy
 Attachments: can-change-replication.patch


 It is difficult to change replication factor by manual editing json config.
 I propose to add a key to kafka-reassign-partitions.sh command to 
 automatically create json config.
 Example of usage
 {code}
 kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor 
 --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate  
 output
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-169) Layering violations in Kafka code

2014-07-17 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-169.
-

Resolution: Won't Fix

 Layering violations in Kafka code
 -

 Key: KAFKA-169
 URL: https://issues.apache.org/jira/browse/KAFKA-169
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jay Kreps
Priority: Minor
 Attachments: draw_deps.py, kafka_deps.svg


 I am noticing  lot of layering violations creeping into the code.
 For example the log implementation depends on zookeeper code now, the network 
 server depends on the kafka api, etc. This stuff is messy and makes it hard 
 to test or reason about the pieces in isolation.
 I have run a quick analysis on the imports to look at problems and there are 
 a few. Let's try to keep this graph in good shape and think about the 
 layering in the code.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-17 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065646#comment-14065646
 ] 

Jay Kreps commented on KAFKA-1535:
--

Our current model of nodes is that they are permanent. That is if there is a 
node 1, if it dies, it will come back or be replaced. It need not literally be 
the same machine, just that if a node dies you will eventually add a new node 
with id 1 which will take over the work 1 used to do.

The metadata response is read by the producer and consumer clients. For example 
in the new java code it is in 
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java.

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: 
 KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch


 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1070) Auto-assign node id

2014-07-17 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065702#comment-14065702
 ] 

Jay Kreps commented on KAFKA-1070:
--

Hey [~harsha_ch], that is a great point.

I'd like to avoid changing the type of the id to a long or UUID so as to not 
have to bump up the protocol format for the metadata request which hands these 
out to clients (we would need a way to handle compatibility with older clients 
that don't expect the longer types).

I think we can get around the problem you point out by just defaulting the node 
id sequence to 1000. This could theoretically conflict but most people number 
from 0 or 1 and we can discuss this in the release notes. Our plan will be to 
release with support for both configured node ids and assigned node ids for 
compatibility. After a couple of releases we will remove the config.

So the behavior would be this:
If there is a node id in the config we will validate it against the node id in 
the data directory
If it matches that good, we'll use that.
If it doesn't match that is bad, we'll crash with an error.
If there is a node id in the data directory but none in the config, we'll use 
whatever is in the data directory.
If there is no node id in the data directory yet but there is  one in the 
config we'll write that to the data directory and use it.
If there is neither a node id in the data directory nor in the config we'll 
allocate a node id and write it to the data directory.


 Auto-assign node id
 ---

 Key: KAFKA-1070
 URL: https://issues.apache.org/jira/browse/KAFKA-1070
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
  Labels: usability

 It would be nice to have Kafka brokers auto-assign node ids rather than 
 having that be a configuration. Having a configuration is irritating because 
 (1) you have to generate a custom config for each broker and (2) even though 
 it is in configuration, changing the node id can cause all kinds of bad 
 things to happen.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-17 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065721#comment-14065721
 ] 

Jay Kreps commented on KAFKA-1414:
--

Hey [~aozeritsky], any before/after perf numbers for your setup?

[~ataraxer] How many threads were being used when that out of memory error 
occurred? My understanding is that that happens when java requests memory from 
the OS and the OS is physically out of memory and not willing to give virtual 
memory. Can you confirm that this is a reproducible thing? If so we may need to 
kind of warn people about that...however it is somewhat counterintuitive that 
on a machine with sufficient memory calling flush, say, from 4 threads would 
crash the process.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-17 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1535:
-

Resolution: Fixed
  Assignee: Jay Kreps
Status: Resolved  (was: Patch Available)

Patch looks good to me!

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jay Kreps
  Labels: newbie
 Attachments: 
 KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch


 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-183) Expose offset vector to the consumer

2014-07-17 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-183.
-

Resolution: Fixed

This is being done in the new consumer.

 Expose offset vector to the consumer
 

 Key: KAFKA-183
 URL: https://issues.apache.org/jira/browse/KAFKA-183
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jay Kreps
Assignee: Jay Kreps

 We should enable consumers to save their position themselves. This would be 
 useful for consumers that need to store consumed data so they can store the 
 data and the position together, this gives a poor man's transactionality 
 since any data loss on the consumer will also rewind the position to the 
 previous position so the two are always in sync.
 Two ways to do this:
 1. Add an OffsetStorage interface and have the zk storage implement this. The 
 user can override this by providing an OffsetStorage implementation of their 
 own to change how values are stored.
 2. Make commit() return the position offset vector and add a 
 setPosition(ListLong) method to initialize the position.
 Let's figure out any potential problems with this, and work out the best 
 approach.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1546) Automate replica lag tuning

2014-07-17 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065806#comment-14065806
 ] 

Jay Kreps commented on KAFKA-1546:
--

I think this is actually a really important thing to get right to make 
replication reliable. There are some subtleties. It would be good to work out 
the basics of how this could work on this JIRA.

For example the throughput on a partition might be 1 msg/sec. But that is 
because only 1 msg/sec is being written by the producer. However if someone 
writes a batch of 1000 messages, that doesn't mean we are necessarily 1000 
seconds behind. 

We already track the time since the last fetch request. So if the fetcher stops 
entirely for too long it will be caught.

I think the other condition we want to be able to catch is one where the 
fetcher is still fetching but it is behind and likely won't catch up. One way 
to make caught-up concrete is to say that the last fetch went to the end of 
the log. We potentially reduce this to one config and just have 
replica.lag.time.ms which would both be the maximum time since a fetch or the 
maximum amount of time without catching up to the leader. The implementation 
would be that every time a fetch didn't go to the logEndOffset we would set the 
lag clock and it would only reset when a fetch request finally went all the way 
to the logEndOffset.



 Automate replica lag tuning
 ---

 Key: KAFKA-1546
 URL: https://issues.apache.org/jira/browse/KAFKA-1546
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
Reporter: Neha Narkhede
  Labels: newbie++

 Currently, there is no good way to tune the replica lag configs to 
 automatically account for high and low volume topics on the same cluster. 
 For the low-volume topic it will take a very long time to detect a lagging
 replica, and for the high-volume topic it will have false-positives.
 One approach to making this easier would be to have the configuration
 be something like replica.lag.max.ms and translate this into a number
 of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-16 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1512.
--

Resolution: Fixed

Committed.

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch, 
 KAFKA-1512_2014-07-03_15:17:55.patch, KAFKA-1512_2014-07-14_13:28:15.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-16 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14063742#comment-14063742
 ] 

Jay Kreps commented on KAFKA-1414:
--

Great patch!

I second Jun's comments.

Is there a way we can make this a little better out-of-the-box? Our experience 
is that people don't set every config optimally out of the box. So a couple 
ideas:

It may be that having more threads than disks doesn't hurt as the I/O scheduler 
can figure it out, in which case we could default to something higher than 1.

Is there really a need for two configurations, one for recovery and one for 
shutdown? It may be that the ideal setting is always just the number of disks. 
Or it may be that the recovery is actually CPU bound whereas flush isn't. If 
the right answer is just the number of disks then we may be able to simplify 
things by just asking for num.disks or io.parallelism or something like 
that and we set the number of threads appropriately for these cases. This would 
let us default num.disks to the number of data directories, but would let users 
with RAID arrays or other logical volume mgmt things override this. It would 
also allow us to use this config for other things related to I/O parallelism as 
they arise without creating to many configs.

This is obviously a minor thing but a little research here would give a nicer 
user experience.

Also, if you have a chance to run a basic before/after test on the speedup on a 
machine with multiple disks that would be great (presumably it is a linear 
speedup). This would let us report the improvement in the release in concrete 
terms as well as validating that it actually works as we expect.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-16 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14064186#comment-14064186
 ] 

Jay Kreps commented on KAFKA-1535:
--

Hey Nicu, this looks good to me. Did you do any testing on this?

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: 
 KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse1.patch


 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-15 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062214#comment-14062214
 ] 

Jay Kreps commented on KAFKA-1535:
--

Yeah we do all development on trunk and just cut branches as a stable point for 
critical point fixes needed after the release.

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie

 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-14 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1512:
-

Attachment: KAFKA-1512_2014-07-14_13:28:15.patch

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch, 
 KAFKA-1512_2014-07-03_15:17:55.patch, KAFKA-1512_2014-07-14_13:28:15.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-14 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061178#comment-14061178
 ] 

Jay Kreps commented on KAFKA-1512:
--

Updated reviewboard https://reviews.apache.org/r/23208/
 against branch trunk

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch, 
 KAFKA-1512_2014-07-03_15:17:55.patch, KAFKA-1512_2014-07-14_13:28:15.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1258) Delete temporary data directory after unit test finishes

2014-07-12 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1258:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Committed. Thanks!

 Delete temporary data directory after unit test finishes
 

 Key: KAFKA-1258
 URL: https://issues.apache.org/jira/browse/KAFKA-1258
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Manikumar Reddy
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1258.patch


 Today in unit testsuite most of the time when a test case is setup a 
 temporary directory will be created with a random int as suffix, and will not 
 be deleted after the test. After a few unit tests this will create tons of 
 directories in java.io.tmpdir (/tmp for Linux). Would be better to remove 
 them for clean unit tests.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1330) Implement subscribe(TopicPartition...partitions) in the new consumer

2014-07-12 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059904#comment-14059904
 ] 

Jay Kreps commented on KAFKA-1330:
--

Created reviewboard https://reviews.apache.org/r/23442/
 against branch trunk

 Implement subscribe(TopicPartition...partitions) in the new consumer
 

 Key: KAFKA-1330
 URL: https://issues.apache.org/jira/browse/KAFKA-1330
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Affects Versions: 0.9.0
Reporter: Neha Narkhede
 Attachments: KAFKA-1330.patch


 This involves adding basic fetch functionality (equivalent to SimpleConsumer) 
 to the new consumer. Effectively implementing 
 subscribe(TopicPartition...partitions) and 
 unsubscribe(TopicPartition...partitions).



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1330) Implement subscribe(TopicPartition...partitions) in the new consumer

2014-07-12 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1330:
-

Attachment: KAFKA-1330.patch

 Implement subscribe(TopicPartition...partitions) in the new consumer
 

 Key: KAFKA-1330
 URL: https://issues.apache.org/jira/browse/KAFKA-1330
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Affects Versions: 0.9.0
Reporter: Neha Narkhede
 Attachments: KAFKA-1330.patch


 This involves adding basic fetch functionality (equivalent to SimpleConsumer) 
 to the new consumer. Effectively implementing 
 subscribe(TopicPartition...partitions) and 
 unsubscribe(TopicPartition...partitions).



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1330) Implement subscribe(TopicPartition...partitions) in the new consumer

2014-07-12 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059905#comment-14059905
 ] 

Jay Kreps commented on KAFKA-1330:
--

Hey [~nmarasoi], sorry had actually been working on this for the last week...I 
should have assigned it to myself.

 Implement subscribe(TopicPartition...partitions) in the new consumer
 

 Key: KAFKA-1330
 URL: https://issues.apache.org/jira/browse/KAFKA-1330
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Affects Versions: 0.9.0
Reporter: Neha Narkhede
Assignee: Jay Kreps
 Attachments: KAFKA-1330.patch


 This involves adding basic fetch functionality (equivalent to SimpleConsumer) 
 to the new consumer. Effectively implementing 
 subscribe(TopicPartition...partitions) and 
 unsubscribe(TopicPartition...partitions).



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1330) Implement subscribe(TopicPartition...partitions) in the new consumer

2014-07-12 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059906#comment-14059906
 ] 

Jay Kreps commented on KAFKA-1330:
--

Guys, I posted a very rough draft that implements simple consumer 
functionality. I think implementing offset maintainence on top of this will be 
pretty easy and can be done once we have those new request defn's done. I'd 
like to start getting these changes in even though they aren't done to avoid 
keeping large patches against trunk. Specifically what I am hoping is that we 
can do a detailed review on the changes to common code, esp with respect to 
NetworkClient and be a bit lax on KafkaConsumer until we get something more 
complete.

 Implement subscribe(TopicPartition...partitions) in the new consumer
 

 Key: KAFKA-1330
 URL: https://issues.apache.org/jira/browse/KAFKA-1330
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Affects Versions: 0.9.0
Reporter: Neha Narkhede
Assignee: Jay Kreps
 Attachments: KAFKA-1330.patch


 This involves adding basic fetch functionality (equivalent to SimpleConsumer) 
 to the new consumer. Effectively implementing 
 subscribe(TopicPartition...partitions) and 
 unsubscribe(TopicPartition...partitions).



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1026) Dynamically Adjust Batch Size Upon Receiving MessageSizeTooLargeException

2014-07-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059044#comment-14059044
 ] 

Jay Kreps commented on KAFKA-1026:
--

I believe this is fixed in the new producer, no? We will allocate up to the 
maximum memory size for messages larger than the batch size.

 Dynamically Adjust Batch Size Upon Receiving MessageSizeTooLargeException
 -

 Key: KAFKA-1026
 URL: https://issues.apache.org/jira/browse/KAFKA-1026
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
  Labels: newbie++
 Fix For: 0.9.0


 Among the exceptions that can possibly received in Producer.send(), 
 MessageSizeTooLargeException is currently not recoverable since the producer 
 does not change the batch size but still retries on sending. It is better to 
 have a dynamic batch size adjustment mechanism based on 
 MessageSizeTooLargeException.
 This is related to KAFKA-998



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1026) Dynamically Adjust Batch Size Upon Receiving MessageSizeTooLargeException

2014-07-11 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1026.
--

   Resolution: Fixed
Fix Version/s: (was: 0.9.0)
   0.8.2

Cool, closing with fix as 0.8.2.

 Dynamically Adjust Batch Size Upon Receiving MessageSizeTooLargeException
 -

 Key: KAFKA-1026
 URL: https://issues.apache.org/jira/browse/KAFKA-1026
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
  Labels: newbie++
 Fix For: 0.8.2


 Among the exceptions that can possibly received in Producer.send(), 
 MessageSizeTooLargeException is currently not recoverable since the producer 
 does not change the batch size but still retries on sending. It is better to 
 have a dynamic batch size adjustment mechanism based on 
 MessageSizeTooLargeException.
 This is related to KAFKA-998



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1040) ConsumerConfig and ProducerConfig do work in the Constructor

2014-07-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059095#comment-14059095
 ] 

Jay Kreps commented on KAFKA-1040:
--

Hey guys, I'm not sure I agree with this principle. The goal of a constructor 
is to construct a valid instance of the object. So validating the constructor 
arguments should totally be done in the constructor. Could you give a concrete 
example of how adding a separate validate() method would make things better?

 ConsumerConfig and ProducerConfig do work in the Constructor
 --

 Key: KAFKA-1040
 URL: https://issues.apache.org/jira/browse/KAFKA-1040
 Project: Kafka
  Issue Type: Improvement
  Components: config, consumer, producer 
Affects Versions: 0.8.0
 Environment: Java 1.7
 Linux Mint 14 (64bit)
Reporter: Sharmarke Aden
Assignee: Neha Narkhede
Priority: Minor
  Labels: config, newbie
 Fix For: 0.9.0


 It appears that validation of configuration properties is performed in the 
 ConsumerConfig and ProducerConfig constructors. This is generally bad 
 practice as it couples object construction and validation. It also makes it 
 difficult to mock these objects in unit tests. 
 Ideally validation of the configuration properties should be separated from 
 object construction and initiated by those that rely/use these config objects.
 http://misko.hevery.com/code-reviewers-guide/flaw-constructor-does-real-work/



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1040) ConsumerConfig and ProducerConfig do work in the Constructor

2014-07-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059175#comment-14059175
 ] 

Jay Kreps commented on KAFKA-1040:
--

Not sure I get the analogy, can we be concrete about the problem this solves. 
Is this meant to help the end user of Kafka or the Kafka developers or both? 
What problem do either of these two people currently have that this change 
would fix? You mentioned mocking, but these classes are internal and we provide 
a mock of the client for the end user...

 ConsumerConfig and ProducerConfig do work in the Constructor
 --

 Key: KAFKA-1040
 URL: https://issues.apache.org/jira/browse/KAFKA-1040
 Project: Kafka
  Issue Type: Improvement
  Components: config, consumer, producer 
Affects Versions: 0.8.0
 Environment: Java 1.7
 Linux Mint 14 (64bit)
Reporter: Sharmarke Aden
Assignee: Neha Narkhede
Priority: Minor
  Labels: config, newbie
 Fix For: 0.9.0


 It appears that validation of configuration properties is performed in the 
 ConsumerConfig and ProducerConfig constructors. This is generally bad 
 practice as it couples object construction and validation. It also makes it 
 difficult to mock these objects in unit tests. 
 Ideally validation of the configuration properties should be separated from 
 object construction and initiated by those that rely/use these config objects.
 http://misko.hevery.com/code-reviewers-guide/flaw-constructor-does-real-work/



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059200#comment-14059200
 ] 

Jay Kreps commented on KAFKA-1535:
--

Cool, go for it. Just attach a patch to this issue and we will review and apply 
it.

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie

 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1515) Wake-up Sender upon blocked on fetching leader metadata

2014-07-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059269#comment-14059269
 ] 

Jay Kreps commented on KAFKA-1515:
--

I am getting transient hangs after ProducerFailureTest.testSendAfterClose, but 
I will assume that is due to KAFKA-1533.

 Wake-up Sender upon blocked on fetching leader metadata
 ---

 Key: KAFKA-1515
 URL: https://issues.apache.org/jira/browse/KAFKA-1515
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1515.patch, KAFKA-1515_2014-07-03_10:19:28.patch, 
 KAFKA-1515_2014-07-03_16:43:05.patch, KAFKA-1515_2014-07-07_10:55:58.patch, 
 KAFKA-1515_2014-07-08_11:35:59.patch


 Currently the new KafkaProducer will not wake up the sender thread upon 
 forcing metadata fetch, and hence if the sender is polling with a long 
 timeout (e.g. the metadata.age period) this wait will usually timeout and 
 fail.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1515) Wake-up Sender upon blocked on fetching leader metadata

2014-07-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059286#comment-14059286
 ] 

Jay Kreps commented on KAFKA-1515:
--

Committed with a very minor change: I renamed the isReadyToSend method to 
isSendable simply because we had like four methods that were ready/isReady/etc 
and each meant something slightly different. This hopefully differentiates 
better: sendable is when you are connected and have room to send, ready is when 
you are sendable and there is nothing else going on that makes us want to block 
the request. 

 Wake-up Sender upon blocked on fetching leader metadata
 ---

 Key: KAFKA-1515
 URL: https://issues.apache.org/jira/browse/KAFKA-1515
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1515.patch, KAFKA-1515_2014-07-03_10:19:28.patch, 
 KAFKA-1515_2014-07-03_16:43:05.patch, KAFKA-1515_2014-07-07_10:55:58.patch, 
 KAFKA-1515_2014-07-08_11:35:59.patch


 Currently the new KafkaProducer will not wake up the sender thread upon 
 forcing metadata fetch, and hence if the sender is polling with a long 
 timeout (e.g. the metadata.age period) this wait will usually timeout and 
 fail.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059287#comment-14059287
 ] 

Jay Kreps commented on KAFKA-1535:
--

Yes I think that is correct. The current behavior is basically an optimization 
to avoid sending all the nodes. However this means if you are only sending to a 
single partition you may only have the host/port info for one machine, if that 
machine goes down you don't have another broker to go to. Since there shouldn't 
be more than a few hundred brokers, sending the full host/port for all of them 
should be okay and fixes this scenario.

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie

 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1537) add request.required.acks=-2 to require acks from all replicas

2014-07-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059451#comment-14059451
 ] 

Jay Kreps commented on KAFKA-1537:
--

Wouldn't it be somewhat odd that you actually want to require N/N replicas to 
acknowledge? It seems more likely that if you have replication factor 3 you 
would want 2 acks, which we already support today. Otherwise you have no 
fault-tolerance.

 add request.required.acks=-2 to require acks from all replicas
 --

 Key: KAFKA-1537
 URL: https://issues.apache.org/jira/browse/KAFKA-1537
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Priority: Minor

 The current options for the producer parameter request.required.acks include 
 0, positive numbers and -1. -1 means the acks from replicas in ISR.
 In a stress test, it's found that, if request.required.acks=-1 and the leader 
 is receiving at high speed, then the followers will fail out of ISR. In this 
 case, request.required.acks=-1 is equivelant to request.required.acks=1 
 because only the leader is in ISR.
 It would be desirable to add request.required.acks=-2 to require acks from 
 all replicas.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1325) Fix inconsistent per topic log configs

2014-07-10 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14057521#comment-14057521
 ] 

Jay Kreps commented on KAFKA-1325:
--

Regardless of which path we go let's think about compatibility. If I currently 
have a bunch of configs in zookeeper, how do they get changed if we rename them?

 Fix inconsistent per topic log configs
 --

 Key: KAFKA-1325
 URL: https://issues.apache.org/jira/browse/KAFKA-1325
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1
Reporter: Neha Narkhede
  Labels: usability
 Attachments: KAFKA-1325.patch, KAFKA-1325.patch


 Related thread from the user mailing list - 
 http://mail-archives.apache.org/mod_mbox/kafka-users/201403.mbox/%3Cd8f6f3857b82c4ccd8725aba6fd68cb8%40cweb01.nmdf.nhnsystem.com%3E
 Our documentation is a little confusing on the log configs. 
 The log property for retention.ms is in millis but the server default it maps 
 to is in minutes.
 Same is true for segment.ms as well. We could either improve the docs or
 change the per-topic configs to be consistent with the server defaults.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1515) Wake-up Sender upon blocked on fetching leader metadata

2014-07-09 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1515.
--

Resolution: Fixed

 Wake-up Sender upon blocked on fetching leader metadata
 ---

 Key: KAFKA-1515
 URL: https://issues.apache.org/jira/browse/KAFKA-1515
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1515_2014-07-03_10:19:28.patch, 
 KAFKA-1515_2014-07-03_16:43:05.patch, KAFKA-1515_2014-07-07_10:55:58.patch, 
 KAFKA-1515_2014-07-08_11:35:59.patch


 Currently the new KafkaProducer will not wake up the sender thread upon 
 forcing metadata fetch, and hence if the sender is polling with a long 
 timeout (e.g. the metadata.age period) this wait will usually timeout and 
 fail.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload

2014-07-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056441#comment-14056441
 ] 

Jay Kreps commented on KAFKA-1532:
--

This is a good idea, but unfortunately would break compatibility with existing 
clients. It would be good to save up enough of these fixes and do them all at 
once to minimize breakage.

 Move CRC32 to AFTER the payload
 ---

 Key: KAFKA-1532
 URL: https://issues.apache.org/jira/browse/KAFKA-1532
 Project: Kafka
  Issue Type: Improvement
  Components: core, producer 
Reporter: Julian Morrison
Assignee: Jun Rao
Priority: Minor

 To support streaming a message of known length but unknown content, take the 
 CRC32 out of the message header and make it a message trailer. Then client 
 libraries can calculate it after streaming the message to Kafka, without 
 materializing the whole message in RAM.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload

2014-07-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056444#comment-14056444
 ] 

Jay Kreps commented on KAFKA-1532:
--

Also since Kafka itself fully materializes messages in memory, fixing this 
won't actually let clients send arbitrarily large messages, as the broker will 
still choke.

 Move CRC32 to AFTER the payload
 ---

 Key: KAFKA-1532
 URL: https://issues.apache.org/jira/browse/KAFKA-1532
 Project: Kafka
  Issue Type: Improvement
  Components: core, producer 
Reporter: Julian Morrison
Assignee: Jun Rao
Priority: Minor

 To support streaming a message of known length but unknown content, take the 
 CRC32 out of the message header and make it a message trailer. Then client 
 libraries can calculate it after streaming the message to Kafka, without 
 materializing the whole message in RAM.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload

2014-07-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056802#comment-14056802
 ] 

Jay Kreps commented on KAFKA-1532:
--

It's not unavoidable, but it is pretty hard. To make this really work we have 
to be able to stream partial requests from the network layer, to the 
api/processing layer, and to the log layer. This is even more complicated 
because additional appends to the log have to be blocked until the message is 
completely written.

 Move CRC32 to AFTER the payload
 ---

 Key: KAFKA-1532
 URL: https://issues.apache.org/jira/browse/KAFKA-1532
 Project: Kafka
  Issue Type: Improvement
  Components: core, producer 
Reporter: Julian Morrison
Assignee: Jun Rao
Priority: Minor

 To support streaming a message of known length but unknown content, take the 
 CRC32 out of the message header and make it a message trailer. Then client 
 libraries can calculate it after streaming the message to Kafka, without 
 materializing the whole message in RAM.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload

2014-07-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056806#comment-14056806
 ] 

Jay Kreps commented on KAFKA-1532:
--

Your approach where you buffer the message with a backing file and then do the 
full write to the log would also work and might be easier. But still a pretty 
big change.

 Move CRC32 to AFTER the payload
 ---

 Key: KAFKA-1532
 URL: https://issues.apache.org/jira/browse/KAFKA-1532
 Project: Kafka
  Issue Type: Improvement
  Components: core, producer 
Reporter: Julian Morrison
Assignee: Jun Rao
Priority: Minor

 To support streaming a message of known length but unknown content, take the 
 CRC32 out of the message header and make it a message trailer. Then client 
 libraries can calculate it after streaming the message to Kafka, without 
 materializing the whole message in RAM.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload

2014-07-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056833#comment-14056833
 ] 

Jay Kreps commented on KAFKA-1532:
--

Yeah agreed. I think that is doable.

 Move CRC32 to AFTER the payload
 ---

 Key: KAFKA-1532
 URL: https://issues.apache.org/jira/browse/KAFKA-1532
 Project: Kafka
  Issue Type: Improvement
  Components: core, producer 
Reporter: Julian Morrison
Assignee: Jun Rao
Priority: Minor

 To support streaming a message of known length but unknown content, take the 
 CRC32 out of the message header and make it a message trailer. Then client 
 libraries can calculate it after streaming the message to Kafka, without 
 materializing the whole message in RAM.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051899#comment-14051899
 ] 

Jay Kreps commented on KAFKA-1512:
--

A proposal from the LI ops team is to also add an override for this so you can 
have custom limits for ips if you want 
  max.connections.per.ip.overrides=192.168.1.1:5, 192.168.1.2:, 192.168.1.3:45
If no objections I will implement this too.

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051899#comment-14051899
 ] 

Jay Kreps edited comment on KAFKA-1512 at 7/3/14 8:46 PM:
--

A proposal from the LI ops team is to also add an override for this so you can 
have custom limits for ips if you want:
{code}
  max.connections.per.ip.overrides=192.168.1.1:5, 192.168.1.2:, 192.168.1.3:45
{code}
If no objections I will implement this too.


was (Author: jkreps):
A proposal from the LI ops team is to also add an override for this so you can 
have custom limits for ips if you want 
  max.connections.per.ip.overrides=192.168.1.1:5, 192.168.1.2:, 192.168.1.3:45
If no objections I will implement this too.

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1519) Console consumer: expose configuration option to enable/disable writing the line separator

2014-07-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051909#comment-14051909
 ] 

Jay Kreps commented on KAFKA-1519:
--

I would also vote for supporting an empty separator using the existing 
argument. It would be great to add kafka.utils.CommandLineUtilsTest with a test 
to cover that method.

I suspect that there would not be any backwards compatibility issues since that 
argument is not currently accepted and it would be unlikely anyone would depend 
on the tool rejecting that argument.

 Console consumer: expose configuration option to enable/disable writing the 
 line separator
 --

 Key: KAFKA-1519
 URL: https://issues.apache.org/jira/browse/KAFKA-1519
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Michael Noll
Assignee: Neha Narkhede
Priority: Minor

 The current console consumer includes a {{DefaultMessageFormatter}}, which 
 exposes a few user-configurable options which can be set on the command line 
 via --property, e.g. --property line.separator=XYZ.
 Unfortunately, the current implementation does not allow the user to 
 completely disable writing any such line separator.  However, this 
 functionality would be helpful to enable users to capture data as is from a 
 Kafka topic to snapshot file.  Capturing data as is -- without an 
 artificial line separator -- is particularly nice for data in a binary format 
 (including Avro).
 *No workaround*
 A potential workaround would be to pass an empty string as the property value 
 of line.separator, but this doesn't work in the current implementation.
 The following variants throw an Invalid parser arguments exception:
 {code}
 --property line.separator=   # nothing
 --property line.separator= # double quotes
 --property line.separator='' # single quotes
 {code}
 Escape tricks via a backslash don't work either.
 If there actually is a workaround please let me know.
 *How to fix*
 We can introduce a print.line option to enable/disable writing 
 line.separator similar to how the code already uses print.key to 
 enable/disable writing key.separator.
 This change is trivial.  To preserve backwards compatibility, the 
 print.line option would be set to true by default (unlike the print.key 
 option, which defaults to false).
 *Alternatives*
 Apart from modifying the built-in {{DefaultMessageFormatter}}, users could of 
 course implement their own custom {{MessageFormatter}}.  But given that it's 
 a) a trivial change to the {{DefaultMessageFormatter}} and b) a nice user 
 feature I'd say changing the built-in {{DefaultMessageFormatter}} would be 
 the better approach.  This way, Kafka would support writing data as-is to a 
 file out of the box.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051951#comment-14051951
 ] 

Jay Kreps commented on KAFKA-1512:
--

Yes, I hadn't thought of that. Disabling connections could potentially be 
useful. The intended use was actually the other way around, basically default 
most things to something reasonable like 10 but have a way to whitelist some 
IPs to have unlimited connections.

The background here is that we were previously having clients bootstrap 
metadata through a VIP (which appears to the kafka nodes as a single ip). We 
just had an issue where a 200 node cluster that uses Kafka started creating and 
leaking connections through the vip which brought down a big shared cluster. So 
we thought we should have some limits. The hope was to change the VIP to DNS 
round-robin and gradually migrate the clients to that. In the meantime we 
thought it would be useful to be able to enforce the limit but whitelist the 
VIP with unlimited connections.

Thinking about this, maybe it is a little crazy hard coding ip/host names in 
config?

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051973#comment-14051973
 ] 

Jay Kreps commented on KAFKA-1512:
--

Updated reviewboard https://reviews.apache.org/r/23208/
 against branch trunk

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch, 
 KAFKA-1512_2014-07-03_15:17:55.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-03 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1512:
-

Attachment: KAFKA-1512_2014-07-03_15:17:55.patch

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch, 
 KAFKA-1512_2014-07-03_15:17:55.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1519) Console consumer: expose configuration option to enable/disable writing the line separator

2014-07-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052113#comment-14052113
 ] 

Jay Kreps commented on KAFKA-1519:
--

Looks good. I think this change might also interpret --property line.seperator 
as the same as --property line.seperator=

Not sure if that is good or confusing...

 Console consumer: expose configuration option to enable/disable writing the 
 line separator
 --

 Key: KAFKA-1519
 URL: https://issues.apache.org/jira/browse/KAFKA-1519
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Michael Noll
Assignee: Neha Narkhede
Priority: Minor
 Attachments: KAFKA-1519.patch


 The current console consumer includes a {{DefaultMessageFormatter}}, which 
 exposes a few user-configurable options which can be set on the command line 
 via --property, e.g. --property line.separator=XYZ.
 Unfortunately, the current implementation does not allow the user to 
 completely disable writing any such line separator.  However, this 
 functionality would be helpful to enable users to capture data as is from a 
 Kafka topic to snapshot file.  Capturing data as is -- without an 
 artificial line separator -- is particularly nice for data in a binary format 
 (including Avro).
 *No workaround*
 A potential workaround would be to pass an empty string as the property value 
 of line.separator, but this doesn't work in the current implementation.
 The following variants throw an Invalid parser arguments exception:
 {code}
 --property line.separator=   # nothing
 --property line.separator= # double quotes
 --property line.separator='' # single quotes
 {code}
 Escape tricks via a backslash don't work either.
 If there actually is a workaround please let me know.
 *How to fix*
 We can introduce a print.line option to enable/disable writing 
 line.separator similar to how the code already uses print.key to 
 enable/disable writing key.separator.
 This change is trivial.  To preserve backwards compatibility, the 
 print.line option would be set to true by default (unlike the print.key 
 option, which defaults to false).
 *Alternatives*
 Apart from modifying the built-in {{DefaultMessageFormatter}}, users could of 
 course implement their own custom {{MessageFormatter}}.  But given that it's 
 a) a trivial change to the {{DefaultMessageFormatter}} and b) a nice user 
 feature I'd say changing the built-in {{DefaultMessageFormatter}} would be 
 the better approach.  This way, Kafka would support writing data as-is to a 
 file out of the box.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-01 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14049117#comment-14049117
 ] 

Jay Kreps commented on KAFKA-1512:
--

A couple things to review:
0. The new config is max.connections.per.ip
1. I am using Socket.getInetAddress() as the key to limit on. I think an 
InetAddress is what we want...a socket address includes the port so is always 
unique, but there is sort of a weird hierarchy of things there. This also 
depends on this address being properly hashable (which it seems to be).
2. I made an unrelated change to how we set the recv buffer. We were weirdly 
setting this over and over again on the server socket every time we accepted a 
connection. I think this was a mistake, so I changed it to set it once. But if 
anyone knows a reason for this odd code that would make me more confident.
3. I don't know of a way to check the source address of a pending connection 
without actually accepting the connection. So as a result this patch accepts 
the connection, and then, if we are over quota, closes it.

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-01 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1512:
-

Attachment: KAFKA-1512.patch

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-07-01 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14049235#comment-14049235
 ] 

Jay Kreps commented on KAFKA-1512:
--

Created reviewboard https://reviews.apache.org/r/23208/
 against branch trunk

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch, KAFKA-1512.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-06-30 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-1512:


 Summary: Limit the maximum number of connections per ip address
 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps


To protect against client connection leaks add a new configuration
  max.connections.per.ip
that causes the SocketServer to enforce a limit on the maximum number of 
connections from each InetAddress instance. For backwards compatibility this 
will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1512) Limit the maximum number of connections per ip address

2014-06-30 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1512:
-

Attachment: KAFKA-1512.patch

 Limit the maximum number of connections per ip address
 --

 Key: KAFKA-1512
 URL: https://issues.apache.org/jira/browse/KAFKA-1512
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1512.patch


 To protect against client connection leaks add a new configuration
   max.connections.per.ip
 that causes the SocketServer to enforce a limit on the maximum number of 
 connections from each InetAddress instance. For backwards compatibility this 
 will default to 2 billion.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-19 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14037878#comment-14037878
 ] 

Jay Kreps commented on KAFKA-1316:
--

Updated reviewboard https://reviews.apache.org/r/22762/
 against branch trunk

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316.patch, 
 KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, 
 KAFKA-1316_2014-06-07_11:20:38.patch, KAFKA-1316_2014-06-19_14:01:04.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1316) Refactor Sender

2014-06-19 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1316:
-

Attachment: KAFKA-1316_2014-06-19_14:01:04.patch

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316.patch, 
 KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, 
 KAFKA-1316_2014-06-07_11:20:38.patch, KAFKA-1316_2014-06-19_14:01:04.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1291) Make wrapper shell scripts for important tools

2014-06-19 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1291:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

[~nehanarkhede] I fixed the simple perf test issue you pointed out. I think I 
just confused myself that you had +1'd the patch by looking at the wrong issue 
and I just committed it. If you have any additional comments let me know and I 
will follow up.

 Make wrapper shell scripts for important tools
 --

 Key: KAFKA-1291
 URL: https://issues.apache.org/jira/browse/KAFKA-1291
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Jay Kreps
  Labels: newbie, usability
 Fix For: 0.8.2

 Attachments: KAFKA-1291.patch, KAFKA-1291.patch, KAFKA-1291.patch


 It is nice to have a proper command for the important tools just to help with 
 discoverability. I noticed that mirror maker doesn't have such a wrapper. 
 Neither does consumer offset checker. It would be good to do an audit and 
 think of any tools that should have a wrapper that don't.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14035911#comment-14035911
 ] 

Jay Kreps commented on KAFKA-1316:
--

Ack, this is dumb bug. Thanks! Will patch.

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316.patch, 
 KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, 
 KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1291) Make wrapper shell scripts for important tools

2014-06-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036034#comment-14036034
 ] 

Jay Kreps commented on KAFKA-1291:
--

Created reviewboard https://reviews.apache.org/r/22744/
 against branch trunk

 Make wrapper shell scripts for important tools
 --

 Key: KAFKA-1291
 URL: https://issues.apache.org/jira/browse/KAFKA-1291
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Jay Kreps
  Labels: newbie, usability
 Fix For: 0.8.2

 Attachments: KAFKA-1291.patch, KAFKA-1291.patch, KAFKA-1291.patch


 It is nice to have a proper command for the important tools just to help with 
 discoverability. I noticed that mirror maker doesn't have such a wrapper. 
 Neither does consumer offset checker. It would be good to do an audit and 
 think of any tools that should have a wrapper that don't.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1291) Make wrapper shell scripts for important tools

2014-06-18 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1291:
-

Attachment: KAFKA-1291.patch

 Make wrapper shell scripts for important tools
 --

 Key: KAFKA-1291
 URL: https://issues.apache.org/jira/browse/KAFKA-1291
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Jay Kreps
  Labels: newbie, usability
 Fix For: 0.8.2

 Attachments: KAFKA-1291.patch, KAFKA-1291.patch, KAFKA-1291.patch


 It is nice to have a proper command for the important tools just to help with 
 discoverability. I noticed that mirror maker doesn't have such a wrapper. 
 Neither does consumer offset checker. It would be good to do an audit and 
 think of any tools that should have a wrapper that don't.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1291) Make wrapper shell scripts for important tools

2014-06-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036041#comment-14036041
 ] 

Jay Kreps commented on KAFKA-1291:
--

Hey [~sgeller] this is great! I updated this patch a bit:
1. I removed the shell script for a couple of tools that seemed kind of 
esoteric and seemed more likely to add confusion.
2. I made all commands print a one-line explanation of what they do when run 
without arguments. Let me know if you think this description seems reasonable.
3. I refactored a little bit of the cut-and-paste in our command line args 
definition.

This should be an easy patch to review as most changes are trivial.

 Make wrapper shell scripts for important tools
 --

 Key: KAFKA-1291
 URL: https://issues.apache.org/jira/browse/KAFKA-1291
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Jay Kreps
  Labels: newbie, usability
 Fix For: 0.8.2

 Attachments: KAFKA-1291.patch, KAFKA-1291.patch, KAFKA-1291.patch


 It is nice to have a proper command for the important tools just to help with 
 discoverability. I noticed that mirror maker doesn't have such a wrapper. 
 Neither does consumer offset checker. It would be good to do an audit and 
 think of any tools that should have a wrapper that don't.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036582#comment-14036582
 ] 

Jay Kreps commented on KAFKA-1316:
--

Created reviewboard https://reviews.apache.org/r/22762/
 against branch trunk

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316.patch, 
 KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, 
 KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1316) Refactor Sender

2014-06-18 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1316:
-

Attachment: KAFKA-1316.patch

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316.patch, 
 KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, 
 KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1316) Refactor Sender

2014-06-11 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1316.
--

Resolution: Fixed

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316.patch, 
 KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, 
 KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1489) Global threshold on data retention size

2014-06-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14028184#comment-14028184
 ] 

Jay Kreps commented on KAFKA-1489:
--

Go for it!

One slight oddity to consider is this. Different nodes will have different 
partitions. So the amount of data retained for different replicas of the same 
partition may vary quite a lot. A replica on a node with lots of data will 
retain little, and one on a more empty broker will retain lots. The current 
per-partition retention strategies are only approximately the same across nodes 
as well, but this will potentially be much more extreme.

In fact, in steady state any partition movement will simultaneously cause data 
to get purged to free up space.

I don't think this is necessarily a problem but we will need to warn people.

 Global threshold on data retention size
 ---

 Key: KAFKA-1489
 URL: https://issues.apache.org/jira/browse/KAFKA-1489
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8.1.1
Reporter: Andras Sereny
Assignee: Jay Kreps
  Labels: newbie

 Currently, Kafka has per topic settings to control the size of one single log 
 (log.retention.bytes). With lots of topics of different volume and as they 
 grow in number, it could become tedious to maintain topic level settings 
 applying to a single log. 
 Often, a chunk of disk space is dedicated to Kafka that hosts all logs 
 stored, so it'd make sense to have a configurable threshold to control how 
 much space *all* data in Kafka can take up.
 See also:
 http://mail-archives.apache.org/mod_mbox/kafka-users/201406.mbox/browser
 http://mail-archives.apache.org/mod_mbox/kafka-users/201311.mbox/%3c20131107015125.gc9...@jkoshy-ld.linkedin.biz%3E



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-10 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027273#comment-14027273
 ] 

Jay Kreps commented on KAFKA-1316:
--

Created reviewboard https://reviews.apache.org/r/22449/
 against branch trunk

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316.patch, 
 KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, 
 KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1316) Refactor Sender

2014-06-10 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1316:
-

Attachment: KAFKA-1316.patch

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316.patch, 
 KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, 
 KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-10 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027276#comment-14027276
 ] 

Jay Kreps commented on KAFKA-1316:
--

Uploaded patch that has those new APIs.

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316.patch, 
 KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, 
 KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14025718#comment-14025718
 ] 

Jay Kreps commented on KAFKA-1316:
--

Thanks for the feedback Neha:
1. Makes sense, we can definitely add that.
2. This is okay, though, right? The config for the two should be independent 
even if some are similar, right?
3. Hmm, not so sure about this. That finds a target for the cluster metadata 
request. The logic might be the same as for another type of metadata request 
(e.g. to discover the co-ordinator) but it also might not. I would feel 
concerned about exposing it unless we can come up with a very general 
description of what it is doing...



 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
 KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14025757#comment-14025757
 ] 

Jay Kreps commented on KAFKA-1316:
--

2. Sure. I guess I don't feel this is necessarily bad as long as the two 
configs are things that change independently.
3. Yeah totally, that is what I was responding to. My concern is just that this 
method is kind of insane (it returns a good node to make a metadata request to 
or sometimes null and assumes you will keep trying while calling poll in 
between). So maybe there is a way to refactor this into a general facility...

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
 KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1316) Refactor Sender

2014-06-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1316:
-

Attachment: KAFKA-1316_2014-06-07_11:20:38.patch

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
 KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020899#comment-14020899
 ] 

Jay Kreps commented on KAFKA-1316:
--

Updated reviewboard https://reviews.apache.org/r/21937/
 against branch trunk

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
 KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020901#comment-14020901
 ] 

Jay Kreps commented on KAFKA-1316:
--

I updated this patch to
1. Address a number of comments.
2. Get all the unit tests passing (there were a couple of bugs that caused 
tests to sporadically hang)

I think overall there are two levels of feedback here. The first is to iron out 
whether the API actually makes sense and is convenient (i.e. is what we are 
trying to do worth doing) and then figure out any additional stylistic or 
correctness issues (i.e. have we done it well).

The current approach has two layers. The network layer is in 
org.apache.kafka.clients.network and has the selector and logic for sending and 
receiving size-delimited byte arrays across a bunch of connections.

The new NetworkClient/KafkaClient layer (name could be improved) is basically 
adding on top of this several concerns:
1. Serialization
2. Cluster metadata management
3. Connection management

So let's really put some thought into seeing if we have these layers right and 
have the right apis.

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
 KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1456) Add LZ4 and LZ4C as a compression codec

2014-06-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14017133#comment-14017133
 ] 

Jay Kreps commented on KAFKA-1456:
--

Guys, does this add a runtime dependency on this jar for all clients?

 Add LZ4 and LZ4C as a compression codec
 ---

 Key: KAFKA-1456
 URL: https://issues.apache.org/jira/browse/KAFKA-1456
 Project: Kafka
  Issue Type: Improvement
Reporter: Joe Stein
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1456.patch, KAFKA-1456_2014-05-19_15:01:10.patch, 
 KAFKA-1456_2014-05-19_16:39:01.patch, KAFKA-1456_2014-05-19_18:19:32.patch, 
 KAFKA-1456_2014-05-19_23:24:27.patch






--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14017182#comment-14017182
 ] 

Jay Kreps commented on KAFKA-1316:
--

Updated reviewboard https://reviews.apache.org/r/21937/
 against branch trunk

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
 KAFKA-1316_2014-06-03_14:33:33.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1316) Refactor Sender

2014-06-03 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1316:
-

Attachment: KAFKA-1316_2014-06-03_14:33:33.patch

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
 KAFKA-1316_2014-06-03_14:33:33.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-05-28 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011735#comment-14011735
 ] 

Jay Kreps commented on KAFKA-1316:
--

[~guozhang] I think you are describing the case where the consumer failure 
detects the co-ordinator due to lack of a hb response, right? Presumably in 
that we would go about rediscovering the co-ordinator? I don't think we need to 
reach into the request queue and attempt to selectively reorder or remove 
items, that is likely to not end well. I think in the case where we failure 
detect the co-ordinator we will just want to reconnect, right? We can add a 
manual disconnect(Node) method to force that if that is more convenient...

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1466) Kafka server is hung after throwing Attempt to swap the new high watermark file with the old one failed

2014-05-28 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011750#comment-14011750
 ] 

Jay Kreps commented on KAFKA-1466:
--

Is it possible you were out of disk space?

Here is the sequence of operations that occurs:

  // swap new offset checkpoint file with previous one
  if(!temp.renameTo(file)) {
// renameTo() fails on Windows if the destination file exists.
file.delete()
if(!temp.renameTo(file))
  throw new IOException(File rename from %s to %s 
failed..format(temp.getAbsolutePath, file.getAbsolutePath))
  }

We first try to just rename the new checkpoint to the old checkpoint. On unix 
this is atomic, but on windows it will fail. If it fails then we manually 
delete the current checkpoint and repeat the rename. This also failed, which is 
what threw the exception. Unfortunately java doesn't give a lot of info when it 
fails so it is a bit hard to debug. Is it possible that the process somehow 
lost permission to write to the checkpoint file or something like that?

Or perhaps this was some kind of transient disk issue.

Regardless if this occurs the correct behavior would be for the IOException to 
be thrown and the server to kill itself. This seems to have happened, but 
somehow the process didn't die?

I think we could try to reproduce this by removing permissions on the offset 
checkpoint file while the server is running. When this happens the expected 
behavior is that the server should shut itself down and another replica should 
be elected as leader.

If we can figure out anything that would cause the rename to fail that we 
aren't handling right then that will be a bug.

If we can reproduce the server not cleanly killing itself and fully exiting 
then that would another bug.

 Kafka server is hung after throwing Attempt to swap the new high watermark 
 file with the old one failed
 -

 Key: KAFKA-1466
 URL: https://issues.apache.org/jira/browse/KAFKA-1466
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: Arup Malakar
 Attachments: kafka.log.1


 We have a kafka cluster of four nodes. The cluster was down after one of the 
 nodes threw the following error:
 2014-05-21 23:19:44 FATAL [highwatermark-checkpoint-thread1]: 
 HighwaterMarkCheckpoint:109 - Attempt to swap the new high watermark file 
 with the old one failed. I saw the following message in the log file of the 
 failed node:
 {code}
 2014-05-21 23:19:44 FATAL [highwatermark-checkpoint-thread1]: 
 HighwaterMarkCheckpoint:109 - Attempt to swap the new high watermark file 
 with the old one failed
 2014-05-21 23:19:44 INFO  [Thread-1]: KafkaServer:67 - [Kafka Server 4], 
 Shutting down
 2014-05-21 23:19:44 INFO  [Thread-1]: KafkaZooKeeper:67 - Closing zookeeper 
 client...
 2014-05-21 23:19:44 INFO  
 [ZkClient-EventThread-21-zoo-c2n1.us-east-1.ooyala.com,zoo-c2n2.us-east-1.ooyala.com,zoo-c2n3.us-east-1.ooyala.com,zoo-c2n4.us-east-1.ooyala.com,zoo-c2n5.u
 s-east-1.ooyala.com]: ZkEventThread:82 - Terminate ZkClient event thread.
 2014-05-21 23:19:44 INFO  [main-EventThread]: ClientCnxn:521 - EventThread 
 shut down
 2014-05-21 23:19:44 INFO  [Thread-1]: ZooKeeper:544 - Session: 
 0x1456b562865b172 closed
 2014-05-21 23:19:44 INFO  [kafka-processor-9092-0]: Processor:67 - Closing 
 socket connection to /10.245.173.136.
 2014-05-21 23:19:44 INFO  [Thread-1]: SocketServer:67 - [Socket Server on 
 Broker 4], Shutting down
 2014-05-21 23:19:44 INFO  [Thread-1]: SocketServer:67 - [Socket Server on 
 Broker 4], Shutdown completed
 2014-05-21 23:19:44 INFO  [Thread-1]: KafkaRequestHandlerPool:67 - [Kafka 
 Request Handler on Broker 4], shutting down
 2014-05-21 23:19:44 INFO  [Thread-1]: KafkaRequestHandlerPool:67 - [Kafka 
 Request Handler on Broker 4], shutted down completely
 2014-05-21 23:19:44 INFO  [Thread-1]: KafkaScheduler:67 - Shutdown Kafka 
 scheduler
 2014-05-21 23:19:45 INFO  [Thread-1]: ReplicaManager:67 - [Replica Manager on 
 Broker 4]: Shut down
 2014-05-21 23:19:45 INFO  [Thread-1]: ReplicaFetcherManager:67 - 
 [ReplicaFetcherManager on broker 4] shutting down
 2014-05-21 23:19:45 INFO  [Thread-1]: ReplicaFetcherThread:67 - 
 [ReplicaFetcherThread-0-3], Shutting down
 2014-05-21 23:19:45 INFO  [ReplicaFetcherThread-0-3]: ReplicaFetcherThread:67 
 - [ReplicaFetcherThread-0-3], Stopped
 2014-05-21 23:19:45 INFO  [Thread-1]: ReplicaFetcherThread:67 - 
 [ReplicaFetcherThread-0-3], Shutdown completed
 2014-05-21 23:19:45 INFO  [Thread-1]: ReplicaFetcherThread:67 - 
 [ReplicaFetcherThread-0-2], Shutting down
 2014-05-21 23:19:45 INFO  [ReplicaFetcherThread-0-2]: ReplicaFetcherThread:67 
 - [ReplicaFetcherThread-0-2], Stopped
 2014-05-21 23:19:45 INFO  [Thread-1]: 

[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-05-27 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14010153#comment-14010153
 ] 

Jay Kreps commented on KAFKA-1316:
--

Created reviewboard https://reviews.apache.org/r/21937/
 against branch trunk

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1316) Refactor Sender

2014-05-27 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14010498#comment-14010498
 ] 

Jay Kreps edited comment on KAFKA-1316 at 5/27/14 11:38 PM:


For (2) I think there are two solutions
1. Change the server semantics to allow processing multiple requests at the 
same time and out of order. 
2. Use two connections

Quick discussion:
1. Allowing out-of-order requests complicates things in the clients a bit as 
you can no longer reason that the Nth response is for the Nth request you made. 
It also isn't clear what we would even guarantee on the server side. The two 
things that we have to do is handle produce requests in order and produce back 
pressure when two much data is sent. Backpressure means the socket server needs 
to stop reading requests, but to make that decision it needs to have parsed the 
request and know it is a produce request...

2. Using two connections might work. It is a bit hacky. The consumer would need 
to create a Node object for the host-port of the current co-ordinator and then 
things would work from there on (I think). The node id would need to be some 
negative number or something. I'm not really sure if there is a clean 
generalization of this.


was (Author: jkreps):
For (2) I think there are two solutions
1. Change the server semantics to allow processing multiple requests at the 
same time and out of order. 
2. Use two connections

Quick discussion:
1. Allowing out-of-order requests complicates things in the clients a bit as 
you can no longer reason that the Nth response is for the Nth request you made. 
It also isn't clear what we would even guarantee on the server side. The two 
things that we have to do is handle produce requests in order and produce back 
pressure when two much data is sent. Backpressure means the socket server needs 
to stop reading requests, but to make that decision it needs to have parsed the 
request and know it is a produce request...

2. Using two connections might work. It is a bit hacky. The consumer would need 
to create a Node object for the host-port of the current co-ordinator and then 
things would work from there on (I think).

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-05-27 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14010498#comment-14010498
 ] 

Jay Kreps commented on KAFKA-1316:
--

For (2) I think there are two solutions
1. Change the server semantics to allow processing multiple requests at the 
same time and out of order. 
2. Use two connections

Quick discussion:
1. Allowing out-of-order requests complicates things in the clients a bit as 
you can no longer reason that the Nth response is for the Nth request you made. 
It also isn't clear what we would even guarantee on the server side. The two 
things that we have to do is handle produce requests in order and produce back 
pressure when two much data is sent. Backpressure means the socket server needs 
to stop reading requests, but to make that decision it needs to have parsed the 
request and know it is a produce request...

2. Using two connections might work. It is a bit hacky. The consumer would need 
to create a Node object for the host-port of the current co-ordinator and then 
things would work from there on (I think).

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1316.patch


 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1467) Fixes small documentation typos

2014-05-23 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1467.
--

Resolution: Fixed

Thanks for the patch! Committed.

 Fixes small documentation typos
 ---

 Key: KAFKA-1467
 URL: https://issues.apache.org/jira/browse/KAFKA-1467
 Project: Kafka
  Issue Type: Bug
  Components: website
Affects Versions: 0.8.1
Reporter: Brian Chhun
Priority: Trivial
  Labels: documentation
 Attachments: KAFKA-1467-consistent-zookeeper-letter-cases.diff, 
 KAFKA-1467-log-compaction-typo.diff


 Fixes a minor typo in the log compaction section of the documentation.
 Also includes second a patch to changes various letter cases used for 
 ZooKeeper (e.g., zookeeper and Zookeeper) into just ZooKeeper. There's 
 nothing to ensure that newer documentation will adhere to the letter case 
 convention, however.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1468) Improve perf tests

2014-05-23 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-1468:


 Summary: Improve perf tests
 Key: KAFKA-1468
 URL: https://issues.apache.org/jira/browse/KAFKA-1468
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps


This is issue is a placeholder for a bunch of improvements that came out of a 
round of benchmarking.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1468) Improve perf tests

2014-05-23 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1468:
-

Attachment: KAFKA-1468.patch

 Improve perf tests
 --

 Key: KAFKA-1468
 URL: https://issues.apache.org/jira/browse/KAFKA-1468
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
 Attachments: KAFKA-1468.patch


 This is issue is a placeholder for a bunch of improvements that came out of a 
 round of benchmarking.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1468) Improve perf tests

2014-05-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14007705#comment-14007705
 ] 

Jay Kreps commented on KAFKA-1468:
--

Created reviewboard https://reviews.apache.org/r/21878/
 against branch trunk

 Improve perf tests
 --

 Key: KAFKA-1468
 URL: https://issues.apache.org/jira/browse/KAFKA-1468
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
 Attachments: KAFKA-1468.patch


 This is issue is a placeholder for a bunch of improvements that came out of a 
 round of benchmarking.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-05-21 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14005249#comment-14005249
 ] 

Jay Kreps commented on KAFKA-1316:
--

Neha, yeah I'll post something as I get it in shape.

Jun, ready is non-blocking. This is important for the producer which needs to 
intersect the set of partitions hosted by nodes whom we are ready to send to 
with the set of nodes for whom we have data. But for the consumer we should 
think this through. Implementing a blocking connect would be something like
{noformat}
while(!client.ready(node, System.currentTimeMillis()))
  client.poll(Collections.emptyList(), 100)
{noformat}
which is not terribly intuitive. We could add an api for this if it would help. 
But let's think through how we would implement the consumer state machine and 
then I think we will know what we need.

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps

 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-05-20 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14004009#comment-14004009
 ] 

Jay Kreps commented on KAFKA-1316:
--

I started on this and I think I have a design that at least works for the 
producer, let's think if it would also work for the consumer.

The plan is that Sender remains, but most of the logic is moved into a new 
class Client which will be shared by the producer, consumer, and any additional 
clients (admin, for example).

The Client class will incapsulate the connection state and the management of 
metadata. The client will exposes two methods:
{noformat}
class Client {
  /* Initiate a connection to the given node (if one doesn't already exist). 
Return true if we already have a ready connection. */
  boolean ready(Node node, long now);

  /* Send new requests and return any completed requests */
  ListClientResponse poll(ListClientRequest requests, long ms);
}
{noformat}

The poll request takes a list of requests for ready connections and attempts to 
send them. It returns any completed requests. The responses returned will not 
generally be for the requests being sent but for previous requests.

ClientRequest is just a renaming and generalization of InFlightRequest. 
ClientResponse is a new class that will reference the original ClientRequest as 
well as maintain the response information received (which we currently handle 
inline in Sender).

The user of this class (e.g. Sender) has to use the ready() method to ensure it 
only initiates requests to ready connections.

What needs to be thought through is whether these APIs suffice for the consumer.

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps

 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1441) Purgatory purge causes latency spikes

2014-05-16 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-1441:


 Summary: Purgatory purge causes latency spikes
 Key: KAFKA-1441
 URL: https://issues.apache.org/jira/browse/KAFKA-1441
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps


The request purgatory has a funky thing where it periodically loops over all 
watches and purges them. If you have a fair number of partitions you can 
accumulate lots of watches and purging them can take a long time. During this 
time all expiry is halted.

Here is an example log:
[2014-05-08 21:07:41,950] INFO ExpiredRequestReaper-2 Expired request after 
10ms: 5829 (kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:41,952] INFO ExpiredRequestReaper-2 Expired request after 
10ms: 5882 (kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:41,967] INFO ExpiredRequestReaper-2 Expired request after 
11ms: 5884 (kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:41,968] INFO ExpiredRequestReaper-2 Purging purgatory 
(kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:41,969] INFO ExpiredRequestReaper-2 Purged 0 requests from 
delay queue. (kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:42,305] INFO ExpiredRequestReaper-2 Purged 340809 (watcher) 
requests. (kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:42,305] INFO ExpiredRequestReaper-2 Expired request after 
106ms: 5847 (kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:42,305] INFO ExpiredRequestReaper-2 Expired request after 
106ms: 5904 (kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:42,328] INFO ExpiredRequestReaper-2 Expired request after 
10ms: 5908 (kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:42,329] INFO ExpiredRequestReaper-2 Expired request after 
10ms: 5852 (kafka.server.RequestPurgatory$ExpiredRequestReaper)
[2014-05-08 21:07:42,343] INFO ExpiredRequestReaper-2 Expired request after 
11ms: 5854 (kafka.server.RequestPurgatory$ExpiredRequestReaper)

Combined with our buggy purgatory request impls that can sometimes hit their 
expiration this can lead to huge latency spikes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready

2014-05-16 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1445.
--

Resolution: Fixed

Applied.

 New Producer should send all partitions that have non-empty batches when on 
 of them is ready
 

 Key: KAFKA-1445
 URL: https://issues.apache.org/jira/browse/KAFKA-1445
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1445.patch, KAFKA-1445.patch, 
 KAFKA-1445_2014-05-13_11:25:13.patch, KAFKA-1445_2014-05-14_16:24:25.patch, 
 KAFKA-1445_2014-05-14_16:28:06.patch, KAFKA-1445_2014-05-15_15:15:37.patch, 
 KAFKA-1445_2014-05-15_15:19:10.patch


 One difference between the new producer and the old producer is that on the 
 new producer the linger time is per partition, instead of global. Therefore, 
 when the traffic is low, the sender will likely expire partitions one-by-one 
 and send lots of small request containing only a few partitions with a few 
 data, resulting largely increased request rate.
 One solution of it would be to let senders select all partitions that have 
 non-empty batches when on of them is ready.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready

2014-05-16 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13999328#comment-13999328
 ] 

Jay Kreps commented on KAFKA-1445:
--

Looks good to me, if no comments from others I will apply this.

 New Producer should send all partitions that have non-empty batches when on 
 of them is ready
 

 Key: KAFKA-1445
 URL: https://issues.apache.org/jira/browse/KAFKA-1445
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1445.patch, KAFKA-1445.patch, 
 KAFKA-1445_2014-05-13_11:25:13.patch, KAFKA-1445_2014-05-14_16:24:25.patch, 
 KAFKA-1445_2014-05-14_16:28:06.patch, KAFKA-1445_2014-05-15_15:15:37.patch, 
 KAFKA-1445_2014-05-15_15:19:10.patch


 One difference between the new producer and the old producer is that on the 
 new producer the linger time is per partition, instead of global. Therefore, 
 when the traffic is low, the sender will likely expire partitions one-by-one 
 and send lots of small request containing only a few partitions with a few 
 data, resulting largely increased request rate.
 One solution of it would be to let senders select all partitions that have 
 non-empty batches when on of them is ready.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1430) Purgatory redesign

2014-05-06 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13990819#comment-13990819
 ] 

Jay Kreps commented on KAFKA-1430:
--

The way we garbage collect obsolete entries was fundamentally a bit sketchy due 
to keeping both an LRU list and hash.

 Purgatory redesign
 --

 Key: KAFKA-1430
 URL: https://issues.apache.org/jira/browse/KAFKA-1430
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao

 We have seen 2 main issues with the Purgatory.
 1. There is no atomic checkAndWatch functionality. So, a client typically 
 first checks whether a request is satisfied or not and then register the 
 watcher. However, by the time the watcher is registered, the registered item 
 could already be satisfied. This item won't be satisfied until the next 
 update happens or the delayed time expires, which means the watched item 
 could be delayed. 
 2. FetchRequestPurgatory doesn't quite work. This is because the current 
 design tries to incrementally maintain the accumulated bytes ready for fetch. 
 However, this is difficult since the right time to check whether a fetch (for 
 regular consumer) request is satisfied is when the high watermark moves. At 
 that point, it's hard to figure out how many bytes we should incrementally 
 add to each pending fetch request.
 The problem has been reported in KAFKA-1150 and KAFKA-703.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1430) Purgatory redesign

2014-05-06 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13990829#comment-13990829
 ] 

Jay Kreps commented on KAFKA-1430:
--

A couple of random thoughts.

- Not sure if I have fully groked the implications of this.

- In general I feel we are not building on a firm foundation with the purgatory 
code and we should somehow find a way to refactor the apis to make code that 
uses it more readable. I think in the absence of this it is impossible to write 
correct code because it is too hard to understand the flow of things. This is 
somewhat a separate thing from this proposal, and could be a separate ticket or 
effort.

- Do we actually need to do anything for requests not in the last segment? My 
concern is that trying to maintain the full set of physical sizes is going to 
be very error prone and will impact the recovery logic too. One thing about 
these apis is that it is always acceptable to return early. So what if we 
maintain physical information about the last segment which we maintain through 
the append api and for all previous segments we just return immediately (after 
all you will never wait on older segments).

 Purgatory redesign
 --

 Key: KAFKA-1430
 URL: https://issues.apache.org/jira/browse/KAFKA-1430
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao

 We have seen 2 main issues with the Purgatory.
 1. There is no atomic checkAndWatch functionality. So, a client typically 
 first checks whether a request is satisfied or not and then register the 
 watcher. However, by the time the watcher is registered, the registered item 
 could already be satisfied. This item won't be satisfied until the next 
 update happens or the delayed time expires, which means the watched item 
 could be delayed. 
 2. FetchRequestPurgatory doesn't quite work. This is because the current 
 design tries to incrementally maintain the accumulated bytes ready for fetch. 
 However, this is difficult since the right time to check whether a fetch (for 
 regular consumer) request is satisfied is when the high watermark moves. At 
 that point, it's hard to figure out how many bytes we should incrementally 
 add to each pending fetch request.
 The problem has been reported in KAFKA-1150 and KAFKA-703.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-05-06 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13991361#comment-13991361
 ] 

Jay Kreps commented on KAFKA-1316:
--

Okay so here is my proposal. I want to refactor out a couple of helper objects 
from Sender, specifically NodeState, InFlightRequest and InFlightRequests.

This will be packaged up inside a class that does this stuff and wraps the 
Selector. It will implement a partition aware request/response layer on top of 
the more general selector API. Not sure what to call this, maybe something like 
ClusterSelector or RequestSelector or something like that.

This class will model the state machine we current go through where a request 
is directed to a particular node and connections are set up in a non-blocking 
way before a request can be made. It will also handle the request/response 
correlation.

I think I may need to just take a stab at it to see exactly how this will work 
so I am just going to dive in.

 Refactor Sender
 ---

 Key: KAFKA-1316
 URL: https://issues.apache.org/jira/browse/KAFKA-1316
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Jay Kreps
Assignee: Jay Kreps

 Currently most of the logic of the producer I/O thread is in Sender.java.
 However we will need to do a fair number of similar things in the new 
 consumer. Specifically:
  - Track in-flight requests
  - Fetch metadata
  - Manage connection lifecycle
 It may be possible to refactor some of this into a helper class that can be 
 shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1428) FIS not closed after Properties.load

2014-04-28 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1428.
--

Resolution: Fixed

Applied. Nice catch.

 FIS not closed after Properties.load
 

 Key: KAFKA-1428
 URL: https://issues.apache.org/jira/browse/KAFKA-1428
 Project: Kafka
  Issue Type: Bug
Reporter: Jon Bringhurst
Priority: Minor
 Attachments: KAFKA-1428.patch


 A FileInputStream is not being closed after using it for a Properties.load.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


<    2   3   4   5   6   7   8   9   10   11   >