[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067872#comment-14067872
 ] 

Alexey Ozeritskiy commented on KAFKA-1414:
--

Anton, your patch is not working for me. I set log.io.parallelism=24 and see 
only one active disk, after some period of time (1 min) I see two active disks.

With my primitive patch (with parArray) kafka uses all 24 disks at once at 
startup.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Anton Karamanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067884#comment-14067884
 ] 

Anton Karamanov commented on KAFKA-1414:


[~aozeritsky] are you testing shutdown or recovery process? Could you please 
attach a part of log which corresponds to it?

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Anton Karamanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067884#comment-14067884
 ] 

Anton Karamanov edited comment on KAFKA-1414 at 7/20/14 11:07 AM:
--

[~aozeritsky] could you please attach a piece of log which corresponds to the 
startup process?


was (Author: ataraxer):
[~aozeritsky] are you testing shutdown or recovery process? Could you please 
attach a part of log which corresponds to it?

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917
 ] 

Alexey Ozeritskiy commented on KAFKA-1414:
--

You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each kafka topic-partition. Yor aproach will not work.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917
 ] 

Alexey Ozeritskiy edited comment on KAFKA-1414 at 7/20/14 12:48 PM:


You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each kafka topic-partition. Your aproach will not work.


was (Author: aozeritsky):
You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each kafka topic-partition. Yor aproach will not work.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917
 ] 

Alexey Ozeritskiy edited comment on KAFKA-1414 at 7/20/14 12:54 PM:


You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each topic-partition. Your aproach will not work.


was (Author: aozeritsky):
You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each kafka topic-partition. Your aproach will not work.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23702: Patch for KAFKA-1070

2014-07-20 Thread Sriharsha Chintalapani


 On July 19, 2014, 11:28 p.m., Timothy Chen wrote:
  core/src/main/scala/kafka/server/KafkaConfig.scala, line 64
  https://reviews.apache.org/r/23702/diff/2/?file=636339#file636339line64
 
  Why reduce the range to 1000? 
 

As we go with auto-generating broker id we want to have a backward 
compatibility where a user can continue to provide brokerId as part of 
server.properties we will give preference to the user provided brokerId if it 
presents instead a of generating a new seqId from zookeeper. 
1000 here is more of a reserved number for user provided config and to avoid 
conflict with zookeeper generated seqId. For more details
please check comments by Jay Kreps here 
https://issues.apache.org/jira/browse/KAFKA-1070. Thanks.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review48184
---


On July 19, 2014, 11:06 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated July 19, 2014, 11:06 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 50b09edb73af1b45f88f919ac8c46ae056878c8e 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 dcdc1ce2b02c996294e19cf480736106aaf29511 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 3faa884f8eb83c7c00baab416d0acfb488dc39c1 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 23702: Patch for KAFKA-1070

2014-07-20 Thread Timothy Chen


 On July 19, 2014, 11:28 p.m., Timothy Chen wrote:
  core/src/main/scala/kafka/server/KafkaConfig.scala, line 64
  https://reviews.apache.org/r/23702/diff/2/?file=636339#file636339line64
 
  Why reduce the range to 1000? 
 
 
 Sriharsha Chintalapani wrote:
 As we go with auto-generating broker id we want to have a backward 
 compatibility where a user can continue to provide brokerId as part of 
 server.properties we will give preference to the user provided brokerId if it 
 presents instead a of generating a new seqId from zookeeper. 
 1000 here is more of a reserved number for user provided config and to 
 avoid conflict with zookeeper generated seqId. For more details
 please check comments by Jay Kreps here 
 https://issues.apache.org/jira/browse/KAFKA-1070. Thanks.

I see, a comment and a constant will be helpful for what it is.


- Timothy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review48184
---


On July 19, 2014, 11:06 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated July 19, 2014, 11:06 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 50b09edb73af1b45f88f919ac8c46ae056878c8e 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 dcdc1ce2b02c996294e19cf480736106aaf29511 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 3faa884f8eb83c7c00baab416d0acfb488dc39c1 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Oleg Golovin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067986#comment-14067986
 ] 

Oleg Golovin commented on KAFKA-1414:
-

[~ataraxer], you seem to run all the threadpool on the first directory, which 
only then consecutively move to then next directories. 
You tried to parallelize IO operation for each directory in case of them being 
on RAID volumes, but you lost inter-directory parallelization. I think we need 
to disperse threads equally among directories.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1551) Configuration example errors

2014-07-20 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1551.
--

Resolution: Fixed

Fixed.

 Configuration example errors
 

 Key: KAFKA-1551
 URL: https://issues.apache.org/jira/browse/KAFKA-1551
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Alexey Ozeritskiy
Assignee: Alexey Ozeritskiy

 A Production Server Config 
 (http://kafka.apache.org/documentation.html#prodconfig) contains error:
 {code}
 # ZK configuration
 zk.connection.timeout.ms=6000
 zk.sync.time.ms=2000
 {code}
 Should be
 {code}
 # ZK configuration
 zookeeper.connection.timeout.ms=6000
 zookeeper.sync.time.ms=2000
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1552) Quickstart refers to wrong port in pastable commands

2014-07-20 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1552:
-

Assignee: Olle Jonsson

 Quickstart refers to wrong port in pastable commands
 

 Key: KAFKA-1552
 URL: https://issues.apache.org/jira/browse/KAFKA-1552
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Olle Jonsson
Assignee: Olle Jonsson
  Labels: documentation, quickstart
 Attachments: quickstart.patch


 The file http://svn.apache.org/repos/asf/kafka/site/081/quickstart.html 
 mentions  the port localhost:218192 which should be localhost:2181.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1552) Quickstart refers to wrong port in pastable commands

2014-07-20 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1552:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Fixed, thanks!

 Quickstart refers to wrong port in pastable commands
 

 Key: KAFKA-1552
 URL: https://issues.apache.org/jira/browse/KAFKA-1552
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Olle Jonsson
Assignee: Olle Jonsson
  Labels: documentation, quickstart
 Attachments: quickstart.patch


 The file http://svn.apache.org/repos/asf/kafka/site/081/quickstart.html 
 mentions  the port localhost:218192 which should be localhost:2181.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1551) Configuration example errors

2014-07-20 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1551:
-

Assignee: Alexey Ozeritskiy

 Configuration example errors
 

 Key: KAFKA-1551
 URL: https://issues.apache.org/jira/browse/KAFKA-1551
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Alexey Ozeritskiy
Assignee: Alexey Ozeritskiy

 A Production Server Config 
 (http://kafka.apache.org/documentation.html#prodconfig) contains error:
 {code}
 # ZK configuration
 zk.connection.timeout.ms=6000
 zk.sync.time.ms=2000
 {code}
 Should be
 {code}
 # ZK configuration
 zookeeper.connection.timeout.ms=6000
 zookeeper.sync.time.ms=2000
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Anton Karamanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068007#comment-14068007
 ] 

Anton Karamanov commented on KAFKA-1414:


I see the problem now. I still think that we need to leave parallelization 
granularity on a log-level, rather than directory-level. We just need to 
interleave queues of directory jobs to make it possible for threads to run on 
all directories at the same time. It shouldn't be hard to implement.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1539) Due to OS caching Kafka might loose offset files which causes full reset of data

2014-07-20 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068010#comment-14068010
 ] 

Dmitry Bugaychenko commented on KAFKA-1539:
---

Digged the proble a bit more. It looks like calling flush on new 
BufferedWriter(new FileWriter(temp)) only forces buffered writer to dump 
everything into a FileOutputStream under the FileWriter and call flush on it. 
However, according to 
http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/io/FileOutputStream.java#FileOutputStream
 it does nothing. In order to really force data to be written to disk you need 
to call fos.getFD().sync(). According to that the patch could be like that:

{code}
  def write(offsets: Map[TopicAndPartition, Long]) {
lock synchronized {
  // write to temp file and then swap with the existing file
  val temp = new File(file.getAbsolutePath + .tmp)

  val fileOutputStream = new FileOutputStream(temp)
  val writer = new BufferedWriter(new FileWriter(fileOutputStream))
  try {
// write the current version
writer.write(0.toString)
writer.newLine()
  
// write the number of entries
writer.write(offsets.size.toString)
writer.newLine()

// write the entries
offsets.foreach { case (topicPart, offset) =
  writer.write(%s %d %d.format(topicPart.topic, topicPart.partition, 
offset))
  writer.newLine()
}
  
// flush and overwrite old file
writer.flush()

// Force fsync to disk
fileOutputStream.getFD.sync()
  } finally {
writer.close()
  }
  
  // swap new offset checkpoint file with previous one
  if(!temp.renameTo(file)) {
// renameTo() fails on Windows if the destination file exists.
file.delete()
if(!temp.renameTo(file))
  throw new IOException(File rename from %s to %s 
failed..format(temp.getAbsolutePath, file.getAbsolutePath))
  }
}
  }
{code}

Note that the problem is easily reproducable only on XFS, ext3/ext4 seems to 
handle this case much better. Hope we will be able to try the patch later this 
week and check if it helps.

 Due to OS caching Kafka might loose offset files which causes full reset of 
 data
 

 Key: KAFKA-1539
 URL: https://issues.apache.org/jira/browse/KAFKA-1539
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps

 Seen this while testing power failure and disk failures. Due to chaching on 
 OS level (eg. XFS can cache data for 30 seconds) after failure we got offset 
 files of zero length. This dramatically slows down broker startup (it have to 
 re-check all segments) and if high watermark offsets lost it simply erases 
 all data and start recovering from other brokers (looks funny - first 
 spending 2-3 hours re-checking logs and then deleting them all due to missing 
 high watermark).
 Proposal: introduce offset files rotation. Keep two version of offset file, 
 write to oldest, read from the newest valid. In this case we would be able to 
 configure offset checkpoint time in a way that at least one file is alway 
 flushed and valid.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1539) Due to OS caching Kafka might loose offset files which causes full reset of data

2014-07-20 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068010#comment-14068010
 ] 

Dmitry Bugaychenko edited comment on KAFKA-1539 at 7/20/14 6:30 PM:


Digged the problem a bit more. It looks like calling flush on new 
BufferedWriter(new FileWriter(temp)) only forces buffered writer to dump 
everything into a FileOutputStream under the FileWriter and call flush on it. 
However, according to 
http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/io/FileOutputStream.java#FileOutputStream
 it does nothing. In order to really force data to be written to disk you need 
to call fos.getFD().sync(). According to that the patch could be like that:

{code}
  def write(offsets: Map[TopicAndPartition, Long]) {
lock synchronized {
  // write to temp file and then swap with the existing file
  val temp = new File(file.getAbsolutePath + .tmp)

  val fileOutputStream = new FileOutputStream(temp)
  val writer = new BufferedWriter(new FileWriter(fileOutputStream))
  try {
// write the current version
writer.write(0.toString)
writer.newLine()
  
// write the number of entries
writer.write(offsets.size.toString)
writer.newLine()

// write the entries
offsets.foreach { case (topicPart, offset) =
  writer.write(%s %d %d.format(topicPart.topic, topicPart.partition, 
offset))
  writer.newLine()
}
  
// flush and overwrite old file
writer.flush()

// Force fsync to disk
fileOutputStream.getFD.sync()
  } finally {
writer.close()
  }
  
  // swap new offset checkpoint file with previous one
  if(!temp.renameTo(file)) {
// renameTo() fails on Windows if the destination file exists.
file.delete()
if(!temp.renameTo(file))
  throw new IOException(File rename from %s to %s 
failed..format(temp.getAbsolutePath, file.getAbsolutePath))
  }
}
  }
{code}

Note that the problem is easily reproducable only on XFS, ext3/ext4 seems to 
handle this case much better. Hope we will be able to try the patch later this 
week and check if it helps.


was (Author: dmitrybugaychenko):
Digged the proble a bit more. It looks like calling flush on new 
BufferedWriter(new FileWriter(temp)) only forces buffered writer to dump 
everything into a FileOutputStream under the FileWriter and call flush on it. 
However, according to 
http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/io/FileOutputStream.java#FileOutputStream
 it does nothing. In order to really force data to be written to disk you need 
to call fos.getFD().sync(). According to that the patch could be like that:

{code}
  def write(offsets: Map[TopicAndPartition, Long]) {
lock synchronized {
  // write to temp file and then swap with the existing file
  val temp = new File(file.getAbsolutePath + .tmp)

  val fileOutputStream = new FileOutputStream(temp)
  val writer = new BufferedWriter(new FileWriter(fileOutputStream))
  try {
// write the current version
writer.write(0.toString)
writer.newLine()
  
// write the number of entries
writer.write(offsets.size.toString)
writer.newLine()

// write the entries
offsets.foreach { case (topicPart, offset) =
  writer.write(%s %d %d.format(topicPart.topic, topicPart.partition, 
offset))
  writer.newLine()
}
  
// flush and overwrite old file
writer.flush()

// Force fsync to disk
fileOutputStream.getFD.sync()
  } finally {
writer.close()
  }
  
  // swap new offset checkpoint file with previous one
  if(!temp.renameTo(file)) {
// renameTo() fails on Windows if the destination file exists.
file.delete()
if(!temp.renameTo(file))
  throw new IOException(File rename from %s to %s 
failed..format(temp.getAbsolutePath, file.getAbsolutePath))
  }
}
  }
{code}

Note that the problem is easily reproducable only on XFS, ext3/ext4 seems to 
handle this case much better. Hope we will be able to try the patch later this 
week and check if it helps.

 Due to OS caching Kafka might loose offset files which causes full reset of 
 data
 

 Key: KAFKA-1539
 URL: https://issues.apache.org/jira/browse/KAFKA-1539
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps

 Seen this while testing power failure and disk failures. Due to chaching on 
 OS level (eg. XFS can cache data 

[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068013#comment-14068013
 ] 

Dmitry Bugaychenko commented on KAFKA-1414:
---

In our use case inter-directory parallelism is a must have. Each directroy is 
backed by it own HDD and we want them all to be utilized in parallel  on 
startup AND shutdown. Log-level parallelism is a no go - having multiple 
threads all targeting the same HDD will only degrade performance.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Oleg Golovin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068014#comment-14068014
 ] 

Oleg Golovin commented on KAFKA-1414:
-

I see only one problem here. If we disperse threads among directories, there 
will be one directory in the end (presumably the biggest one) where all the 
threads will go. When it happens, such number of threads will only aggravate 
performance for spinning drives, as it will results in more disk head seeks 
(though the OS will try to optimize those reads, I think seeks will not be 
avoided).
Maybe we would rather have a separate thread pool for each directory, with the 
{{number of threads = log.io.parallelism / number_of_directories}}?

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068015#comment-14068015
 ] 

Dmitry Bugaychenko commented on KAFKA-1414:
---

That is why in my initial proposal each thread gets its own dir and dies when 
done. For us it works perfectly since IO is the bottleneck and it is not worth 
increasing amount of thread per directory to more then one.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Oleg Golovin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068018#comment-14068018
 ] 

Oleg Golovin commented on KAFKA-1414:
-

[~junrao] wrote about parallelization over directory in 32.1 (see above). I 
think RAID is a legitimate case for some Kafka users. So it has to be accounted 
for too. 

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068020#comment-14068020
 ] 

Dmitry Bugaychenko commented on KAFKA-1414:
---

Separate thread pool per each dir with ability to set number of threads to 1 or 
higher, as you proposed, could solve both cases and looks reasonably simple. 

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068021#comment-14068021
 ] 

Jay Kreps commented on KAFKA-1414:
--

We should check that having multiple threads accessing the same directory will 
actually have a negative impact.

First, I don't think anyone has actually checked that recovery is actually I/O 
bound. Recovery iterates over the log doing a bunch of small message reads, 
decompressing any compressed messages, and checking the CRC of all the data. 
This may actually be CPU bound.

Secondly the OS will do normal readahead which should help protect somewhat 
against random access for two interspersed linear accesses.

This should be easy to test. Run the perf test on a single node broker with 
multiple partitions on a single drive, then kill -9 it. Then run 
{code}
  echo 1  /proc/sys/vm/drop_caches
{code}
and restart. If we do this twice, once with 1 thread and once with 2. The 
prediction is that the 2 threaded case will be slower than the 1 thread case, 
but it may actually not be.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Improved site docs on how to contribute

2014-07-20 Thread Jay Kreps
Following up on the recent contributor thread I tried to improve the
docs on how to contribute to Kafka:

http://kafka.apache.org/contributing.html

Let me know if anyone has any additional suggestions on what should be there.

-Jay


[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Anton Karamanov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Karamanov updated KAFKA-1414:
---

Attachment: KAFKA-1414-rev3-interleaving.patch

Log-level parallelization is better in any case, because it's more flexible. As 
I mentioned, dir-level parallelization can be achieved on top of log-level 
parallelization by interleaving jobs of every dir. That will make it possible 
to load logs from all disks at the same time.

On the other hand, I second [~jkreps] on testing whether log loading is CPU or 
IO bound and measuring a speed of start and shutdown with my patch. After all 
-- it's not the number of disks which are spinning wildly that matters, but the 
overall time it takes to start/shutdown the system.

(Oh, and if we decide to use interleaved jobs strategy -- here's a 
[patch|^KAFKA-1414-rev3-interleaving.patch] (: )

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, 
 parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Anton Karamanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068047#comment-14068047
 ] 

Anton Karamanov edited comment on KAFKA-1414 at 7/20/14 9:14 PM:
-

Log-level parallelization is better in any case, because it's more flexible. As 
I mentioned, dir-level parallelization can be achieved on top of log-level 
parallelization by interleaving jobs of every dir. That will make it possible 
to load logs from all disks at the same time.

I second [~jkreps] on testing whether log loading is CPU or IO bound and 
measuring a speed of start and shutdown with my patch first. After all -- it's 
not the number of disks which are spinning wildly that matters, but the overall 
time it takes to start/shutdown the system.

(Oh, and if we decide to use interleaved jobs strategy -- here's a 
[patch|^KAFKA-1414-rev3-interleaving.patch] (: )


was (Author: ataraxer):
Log-level parallelization is better in any case, because it's more flexible. As 
I mentioned, dir-level parallelization can be achieved on top of log-level 
parallelization by interleaving jobs of every dir. That will make it possible 
to load logs from all disks at the same time.

On the other hand, I second [~jkreps] on testing whether log loading is CPU or 
IO bound and measuring a speed of start and shutdown with my patch. After all 
-- it's not the number of disks which are spinning wildly that matters, but the 
overall time it takes to start/shutdown the system.

(Oh, and if we decide to use interleaved jobs strategy -- here's a 
[patch|^KAFKA-1414-rev3-interleaving.patch] (: )

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, 
 parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23697: Fix KAFKA-1533: Address Jun and Neha's comments

2014-07-20 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23697/#review48194
---



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/23697/#comment84551

For clarity, could we calculate the second part to a local val and give it 
a meaningful name?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/23697/#comment84550

Thinking a bit more about this. There are two possibility after adding the 
metadata request to sends: (1) The metadata fetch succeeds. In this case 
Metadata.lastRefreshMs will be updated and this line is not needed. (2) The 
metadata fetch fails due to a connection issue. In this case, we probably want 
to fetch the metadata request from another node immediately, instead of backing 
off.

So, it seems that in NetworkClient, we only need to take care of the case 
when no node is available for metadata. In this case, we should backoff. So, 
perhaps we should rename metadataLastUpdateAttemptMs to sth like lastNoNodeMs.



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
https://reviews.apache.org/r/23697/#comment84552

Indentation.


- Jun Rao


On July 18, 2014, 10:45 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23697/
 ---
 
 (Updated July 18, 2014, 10:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1533
 https://issues.apache.org/jira/browse/KAFKA-1533
 
 
 Repository: kafka
 
 
 Description
 ---
 
 1. Add the metadataRefreshAttemptMs in NetworkClient for backing off; 2. 
 Refactor Producer API tests using KafkaTestHarness; 3. Change default backoff 
 time to 100ms for test utils
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 d8f9ce663ee24d2b0852c974136741280c39f8f8 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
  4aa5b01d611631db72df47d50bbe30edb8c478db 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 15fd5bcbaf175a0f7d7cf0b142e63f705ca9b6ae 
   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
 34a7db4b4ea2b720476c2b1f22a623a997faffbc 
   core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
 194dd70919a5f301d3131c56594e40a0ebb27311 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 3faa884f8eb83c7c00baab416d0acfb488dc39c1 
 
 Diff: https://reviews.apache.org/r/23697/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




[jira] [Commented] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse

2014-07-20 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068165#comment-14068165
 ] 

Jun Rao commented on KAFKA-1549:


Thanks for the patch. A couple of comments.

1. inReadLock seems like a generally useful util. We probably can add the 
following to the Utils file where inLock lives, and change all existing usage 
of readWriteLock to use those utils.
  def inReadLock[T](lock: ReadWriteLock)(fun: = T): T = {
  def inWriteLock[T](lock: ReadWriteLock)(fun: = T): T = {

2. Not sure what the purpose of the change in KafkaApis is. Is that to avoid 
the overhead of an extra readLock? It's better to limit the usage of the lock 
inside the MetadataCache.


 dead brokers coming in the TopicMetadataResponse
 

 Key: KAFKA-1549
 URL: https://issues.apache.org/jira/browse/KAFKA-1549
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
 Environment: trunk
Reporter: nicu marasoiu
Assignee: Jun Rao
 Attachments: 
 KAFKA-1549__dead_brokers_coming_in_the_TopicMetadataResponse_.patch


 JunRao confirming my observation that brokers are only added to the 
 metadataCache, never removed: The way that we update liveBrokers in 
 MetadataCache.updateCache() doesn't seem right. We only add newly received 
 live brokers to the list. However, there could be existing brokers in that 
 list that are now dead. Those dead brokers shouldn't be returned to the 
 clients. We should probably just take the new live broker list and cache it.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23705: Patch for KAFKA-1192

2014-07-20 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23705/#review48195
---


Thanks for the patch. A couple of comments below.


core/src/main/scala/kafka/tools/DumpLogSegments.scala
https://reviews.apache.org/r/23705/#comment84553

Shouldn't Decoder be kafka.serializer.Decoder?

Also, capitalize custom.



core/src/main/scala/kafka/tools/DumpLogSegments.scala
https://reviews.apache.org/r/23705/#comment84554

Could we add a decoder for the key too?


- Jun Rao


On July 19, 2014, 8:28 a.m., Manikumar Reddy O wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23705/
 ---
 
 (Updated July 19, 2014, 8:28 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1192
 https://issues.apache.org/jira/browse/KAFKA-1192
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Support given for custom deserialization of messages
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/DumpLogSegments.scala 
 6daf87b25a48a51aafb7dbe8d0c0371e0ea7501f 
 
 Diff: https://reviews.apache.org/r/23705/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Manikumar Reddy O
 




[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068173#comment-14068173
 ] 

Jay Kreps commented on KAFKA-1414:
--

I did a test of single-threaded recovery on my laptop where most data is in 
memory and the disk is an SSD. This should remove any I/O bottleneck. I see log 
recovery working at about 275MB/sec with 100% CPU load on one core. This 
indicates that on a non-ssd drive (most Kafka machines, I would imagine), I/O 
would be the bottleneck. 

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, 
 parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068177#comment-14068177
 ] 

Jay Kreps commented on KAFKA-1414:
--

[~ataraxer] I agree that just throwing all the logs at a thread pool is simpler 
to configure and probably also to implement. To see if that will work we need 
to get data on the effect of multiple threads running recovery on a single 
drive will have significant negative perf impacts. Want to try that one out and 
see? If the impact is minor I think we are probably better off with the simpler 
strategy, but if it tanks performance we may need to try to be more explicit 
about the data directories. 

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, 
 parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1451) Broker stuck due to leader election race

2014-07-20 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068193#comment-14068193
 ] 

Jun Rao commented on KAFKA-1451:


Neha, I am not sure if #1 is need. We can get into elect from two paths (1) 
from startup or (2) from handleDeleted. If it's from startup, we already 
register the watcher before calling elect. If it's from handleDeleted, it means 
that the watcher must have already been registered. So, once in elect, we know 
the watcher is already registered. So if after we check the existence of the 
controller node and the controller node goes away immediately afterward, the 
watcher is guaranteed to be triggered.

 Broker stuck due to leader election race 
 -

 Key: KAFKA-1451
 URL: https://issues.apache.org/jira/browse/KAFKA-1451
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Maciek Makowski
Priority: Minor
  Labels: newbie

 h3. Symptoms
 The broker does not become available due to being stuck in an infinite loop 
 while electing leader. This can be recognised by the following line being 
 repeatedly written to server.log:
 {code}
 [2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
 [{version:1,brokerid:1,timestamp:1400060079108}] at /controller a 
 while back in a different session, hence I will backoff for this node to be 
 deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
 {code}
 h3. Steps to Reproduce
 In a single kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely 
 behave the same with the ZK version included in Kafka distribution) node 
 setup:
 # start both zookeeper and kafka (in any order)
 # stop zookeeper
 # stop kafka
 # start kafka
 # start zookeeper
 h3. Likely Cause
 {{ZookeeperLeaderElector}} subscribes to data changes on startup, and then 
 triggers an election. if the deletion of ephemeral {{/controller}} node 
 associated with previous zookeeper session of the broker happens after 
 subscription to changes in new session, election will be invoked twice, once 
 from {{startup}} and once from {{handleDataDeleted}}:
 * {{startup}}: acquire {{controllerLock}}
 * {{startup}}: subscribe to data changes
 * zookeeper: delete {{/controller}} since the session that created it timed 
 out
 * {{handleDataDeleted}}: {{/controller}} was deleted
 * {{handleDataDeleted}}: wait on {{controllerLock}}
 * {{startup}}: elect -- writes {{/controller}}
 * {{startup}}: release {{controllerLock}}
 * {{handleDataDeleted}}: acquire {{controllerLock}}
 * {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then 
 gets into infinite loop as a result of conflict
 {{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing 
 znode was written from different session, which is not true in this case; it 
 was written from the same session. That adds to the confusion.
 h3. Suggested Fix
 In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe 
 to data changes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1546) Automate replica lag tuning

2014-07-20 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068209#comment-14068209
 ] 

Jun Rao commented on KAFKA-1546:


Jay, I think what you proposed is nice and simple. I think it works. One 
subtlety is dealing with max.wait.ms in the follower fetch request. Imagine 
that a follower has caught up and its fetch request is sitting in the 
purgatory. The last caught up time won't be updated for max.wait.ms if no new 
messages come in. When a message does come in, if max.wait.ms is larger than 
replica.lag.time.ms, the follower is now considered out of sync. Perhaps we 
should use the timestamp of when the first message is appended to the leader 
after the last caught up time. If the amount of time since then is more than 
replica.lag.time.ms, the replica is considered out of sync.

 Automate replica lag tuning
 ---

 Key: KAFKA-1546
 URL: https://issues.apache.org/jira/browse/KAFKA-1546
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
Reporter: Neha Narkhede
  Labels: newbie++

 Currently, there is no good way to tune the replica lag configs to 
 automatically account for high and low volume topics on the same cluster. 
 For the low-volume topic it will take a very long time to detect a lagging
 replica, and for the high-volume topic it will have false-positives.
 One approach to making this easier would be to have the configuration
 be something like replica.lag.max.ms and translate this into a number
 of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.2#6252)