[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067872#comment-14067872 ] Alexey Ozeritskiy commented on KAFKA-1414: -- Anton, your patch is not working for me. I set log.io.parallelism=24 and see only one active disk, after some period of time (1 min) I see two active disks. With my primitive patch (with parArray) kafka uses all 24 disks at once at startup. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067884#comment-14067884 ] Anton Karamanov commented on KAFKA-1414: [~aozeritsky] are you testing shutdown or recovery process? Could you please attach a part of log which corresponds to it? Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067884#comment-14067884 ] Anton Karamanov edited comment on KAFKA-1414 at 7/20/14 11:07 AM: -- [~aozeritsky] could you please attach a piece of log which corresponds to the startup process? was (Author: ataraxer): [~aozeritsky] are you testing shutdown or recovery process? Could you please attach a part of log which corresponds to it? Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917 ] Alexey Ozeritskiy commented on KAFKA-1414: -- You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each kafka topic-partition. Yor aproach will not work. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917 ] Alexey Ozeritskiy edited comment on KAFKA-1414 at 7/20/14 12:48 PM: You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each kafka topic-partition. Your aproach will not work. was (Author: aozeritsky): You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each kafka topic-partition. Yor aproach will not work. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917 ] Alexey Ozeritskiy edited comment on KAFKA-1414 at 7/20/14 12:54 PM: You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each topic-partition. Your aproach will not work. was (Author: aozeritsky): You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each kafka topic-partition. Your aproach will not work. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23702: Patch for KAFKA-1070
On July 19, 2014, 11:28 p.m., Timothy Chen wrote: core/src/main/scala/kafka/server/KafkaConfig.scala, line 64 https://reviews.apache.org/r/23702/diff/2/?file=636339#file636339line64 Why reduce the range to 1000? As we go with auto-generating broker id we want to have a backward compatibility where a user can continue to provide brokerId as part of server.properties we will give preference to the user provided brokerId if it presents instead a of generating a new seqId from zookeeper. 1000 here is more of a reserved number for user provided config and to avoid conflict with zookeeper generated seqId. For more details please check comments by Jay Kreps here https://issues.apache.org/jira/browse/KAFKA-1070. Thanks. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review48184 --- On July 19, 2014, 11:06 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated July 19, 2014, 11:06 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto assign node id. Diffs - core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 50b09edb73af1b45f88f919ac8c46ae056878c8e core/src/main/scala/kafka/server/KafkaServer.scala def1dc2a5818d45d9ee0881137ff989cec4eb9b1 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 23702: Patch for KAFKA-1070
On July 19, 2014, 11:28 p.m., Timothy Chen wrote: core/src/main/scala/kafka/server/KafkaConfig.scala, line 64 https://reviews.apache.org/r/23702/diff/2/?file=636339#file636339line64 Why reduce the range to 1000? Sriharsha Chintalapani wrote: As we go with auto-generating broker id we want to have a backward compatibility where a user can continue to provide brokerId as part of server.properties we will give preference to the user provided brokerId if it presents instead a of generating a new seqId from zookeeper. 1000 here is more of a reserved number for user provided config and to avoid conflict with zookeeper generated seqId. For more details please check comments by Jay Kreps here https://issues.apache.org/jira/browse/KAFKA-1070. Thanks. I see, a comment and a constant will be helpful for what it is. - Timothy --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review48184 --- On July 19, 2014, 11:06 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated July 19, 2014, 11:06 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto assign node id. Diffs - core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 50b09edb73af1b45f88f919ac8c46ae056878c8e core/src/main/scala/kafka/server/KafkaServer.scala def1dc2a5818d45d9ee0881137ff989cec4eb9b1 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067986#comment-14067986 ] Oleg Golovin commented on KAFKA-1414: - [~ataraxer], you seem to run all the threadpool on the first directory, which only then consecutively move to then next directories. You tried to parallelize IO operation for each directory in case of them being on RAID volumes, but you lost inter-directory parallelization. I think we need to disperse threads equally among directories. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1551) Configuration example errors
[ https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1551. -- Resolution: Fixed Fixed. Configuration example errors Key: KAFKA-1551 URL: https://issues.apache.org/jira/browse/KAFKA-1551 Project: Kafka Issue Type: Bug Components: website Reporter: Alexey Ozeritskiy Assignee: Alexey Ozeritskiy A Production Server Config (http://kafka.apache.org/documentation.html#prodconfig) contains error: {code} # ZK configuration zk.connection.timeout.ms=6000 zk.sync.time.ms=2000 {code} Should be {code} # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1552) Quickstart refers to wrong port in pastable commands
[ https://issues.apache.org/jira/browse/KAFKA-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1552: - Assignee: Olle Jonsson Quickstart refers to wrong port in pastable commands Key: KAFKA-1552 URL: https://issues.apache.org/jira/browse/KAFKA-1552 Project: Kafka Issue Type: Bug Components: website Reporter: Olle Jonsson Assignee: Olle Jonsson Labels: documentation, quickstart Attachments: quickstart.patch The file http://svn.apache.org/repos/asf/kafka/site/081/quickstart.html mentions the port localhost:218192 which should be localhost:2181. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1552) Quickstart refers to wrong port in pastable commands
[ https://issues.apache.org/jira/browse/KAFKA-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1552: - Resolution: Fixed Status: Resolved (was: Patch Available) Fixed, thanks! Quickstart refers to wrong port in pastable commands Key: KAFKA-1552 URL: https://issues.apache.org/jira/browse/KAFKA-1552 Project: Kafka Issue Type: Bug Components: website Reporter: Olle Jonsson Assignee: Olle Jonsson Labels: documentation, quickstart Attachments: quickstart.patch The file http://svn.apache.org/repos/asf/kafka/site/081/quickstart.html mentions the port localhost:218192 which should be localhost:2181. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1551) Configuration example errors
[ https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1551: - Assignee: Alexey Ozeritskiy Configuration example errors Key: KAFKA-1551 URL: https://issues.apache.org/jira/browse/KAFKA-1551 Project: Kafka Issue Type: Bug Components: website Reporter: Alexey Ozeritskiy Assignee: Alexey Ozeritskiy A Production Server Config (http://kafka.apache.org/documentation.html#prodconfig) contains error: {code} # ZK configuration zk.connection.timeout.ms=6000 zk.sync.time.ms=2000 {code} Should be {code} # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068007#comment-14068007 ] Anton Karamanov commented on KAFKA-1414: I see the problem now. I still think that we need to leave parallelization granularity on a log-level, rather than directory-level. We just need to interleave queues of directory jobs to make it possible for threads to run on all directories at the same time. It shouldn't be hard to implement. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1539) Due to OS caching Kafka might loose offset files which causes full reset of data
[ https://issues.apache.org/jira/browse/KAFKA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068010#comment-14068010 ] Dmitry Bugaychenko commented on KAFKA-1539: --- Digged the proble a bit more. It looks like calling flush on new BufferedWriter(new FileWriter(temp)) only forces buffered writer to dump everything into a FileOutputStream under the FileWriter and call flush on it. However, according to http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/io/FileOutputStream.java#FileOutputStream it does nothing. In order to really force data to be written to disk you need to call fos.getFD().sync(). According to that the patch could be like that: {code} def write(offsets: Map[TopicAndPartition, Long]) { lock synchronized { // write to temp file and then swap with the existing file val temp = new File(file.getAbsolutePath + .tmp) val fileOutputStream = new FileOutputStream(temp) val writer = new BufferedWriter(new FileWriter(fileOutputStream)) try { // write the current version writer.write(0.toString) writer.newLine() // write the number of entries writer.write(offsets.size.toString) writer.newLine() // write the entries offsets.foreach { case (topicPart, offset) = writer.write(%s %d %d.format(topicPart.topic, topicPart.partition, offset)) writer.newLine() } // flush and overwrite old file writer.flush() // Force fsync to disk fileOutputStream.getFD.sync() } finally { writer.close() } // swap new offset checkpoint file with previous one if(!temp.renameTo(file)) { // renameTo() fails on Windows if the destination file exists. file.delete() if(!temp.renameTo(file)) throw new IOException(File rename from %s to %s failed..format(temp.getAbsolutePath, file.getAbsolutePath)) } } } {code} Note that the problem is easily reproducable only on XFS, ext3/ext4 seems to handle this case much better. Hope we will be able to try the patch later this week and check if it helps. Due to OS caching Kafka might loose offset files which causes full reset of data Key: KAFKA-1539 URL: https://issues.apache.org/jira/browse/KAFKA-1539 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Seen this while testing power failure and disk failures. Due to chaching on OS level (eg. XFS can cache data for 30 seconds) after failure we got offset files of zero length. This dramatically slows down broker startup (it have to re-check all segments) and if high watermark offsets lost it simply erases all data and start recovering from other brokers (looks funny - first spending 2-3 hours re-checking logs and then deleting them all due to missing high watermark). Proposal: introduce offset files rotation. Keep two version of offset file, write to oldest, read from the newest valid. In this case we would be able to configure offset checkpoint time in a way that at least one file is alway flushed and valid. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1539) Due to OS caching Kafka might loose offset files which causes full reset of data
[ https://issues.apache.org/jira/browse/KAFKA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068010#comment-14068010 ] Dmitry Bugaychenko edited comment on KAFKA-1539 at 7/20/14 6:30 PM: Digged the problem a bit more. It looks like calling flush on new BufferedWriter(new FileWriter(temp)) only forces buffered writer to dump everything into a FileOutputStream under the FileWriter and call flush on it. However, according to http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/io/FileOutputStream.java#FileOutputStream it does nothing. In order to really force data to be written to disk you need to call fos.getFD().sync(). According to that the patch could be like that: {code} def write(offsets: Map[TopicAndPartition, Long]) { lock synchronized { // write to temp file and then swap with the existing file val temp = new File(file.getAbsolutePath + .tmp) val fileOutputStream = new FileOutputStream(temp) val writer = new BufferedWriter(new FileWriter(fileOutputStream)) try { // write the current version writer.write(0.toString) writer.newLine() // write the number of entries writer.write(offsets.size.toString) writer.newLine() // write the entries offsets.foreach { case (topicPart, offset) = writer.write(%s %d %d.format(topicPart.topic, topicPart.partition, offset)) writer.newLine() } // flush and overwrite old file writer.flush() // Force fsync to disk fileOutputStream.getFD.sync() } finally { writer.close() } // swap new offset checkpoint file with previous one if(!temp.renameTo(file)) { // renameTo() fails on Windows if the destination file exists. file.delete() if(!temp.renameTo(file)) throw new IOException(File rename from %s to %s failed..format(temp.getAbsolutePath, file.getAbsolutePath)) } } } {code} Note that the problem is easily reproducable only on XFS, ext3/ext4 seems to handle this case much better. Hope we will be able to try the patch later this week and check if it helps. was (Author: dmitrybugaychenko): Digged the proble a bit more. It looks like calling flush on new BufferedWriter(new FileWriter(temp)) only forces buffered writer to dump everything into a FileOutputStream under the FileWriter and call flush on it. However, according to http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/io/FileOutputStream.java#FileOutputStream it does nothing. In order to really force data to be written to disk you need to call fos.getFD().sync(). According to that the patch could be like that: {code} def write(offsets: Map[TopicAndPartition, Long]) { lock synchronized { // write to temp file and then swap with the existing file val temp = new File(file.getAbsolutePath + .tmp) val fileOutputStream = new FileOutputStream(temp) val writer = new BufferedWriter(new FileWriter(fileOutputStream)) try { // write the current version writer.write(0.toString) writer.newLine() // write the number of entries writer.write(offsets.size.toString) writer.newLine() // write the entries offsets.foreach { case (topicPart, offset) = writer.write(%s %d %d.format(topicPart.topic, topicPart.partition, offset)) writer.newLine() } // flush and overwrite old file writer.flush() // Force fsync to disk fileOutputStream.getFD.sync() } finally { writer.close() } // swap new offset checkpoint file with previous one if(!temp.renameTo(file)) { // renameTo() fails on Windows if the destination file exists. file.delete() if(!temp.renameTo(file)) throw new IOException(File rename from %s to %s failed..format(temp.getAbsolutePath, file.getAbsolutePath)) } } } {code} Note that the problem is easily reproducable only on XFS, ext3/ext4 seems to handle this case much better. Hope we will be able to try the patch later this week and check if it helps. Due to OS caching Kafka might loose offset files which causes full reset of data Key: KAFKA-1539 URL: https://issues.apache.org/jira/browse/KAFKA-1539 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Seen this while testing power failure and disk failures. Due to chaching on OS level (eg. XFS can cache data
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068013#comment-14068013 ] Dmitry Bugaychenko commented on KAFKA-1414: --- In our use case inter-directory parallelism is a must have. Each directroy is backed by it own HDD and we want them all to be utilized in parallel on startup AND shutdown. Log-level parallelism is a no go - having multiple threads all targeting the same HDD will only degrade performance. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068014#comment-14068014 ] Oleg Golovin commented on KAFKA-1414: - I see only one problem here. If we disperse threads among directories, there will be one directory in the end (presumably the biggest one) where all the threads will go. When it happens, such number of threads will only aggravate performance for spinning drives, as it will results in more disk head seeks (though the OS will try to optimize those reads, I think seeks will not be avoided). Maybe we would rather have a separate thread pool for each directory, with the {{number of threads = log.io.parallelism / number_of_directories}}? Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068015#comment-14068015 ] Dmitry Bugaychenko commented on KAFKA-1414: --- That is why in my initial proposal each thread gets its own dir and dies when done. For us it works perfectly since IO is the bottleneck and it is not worth increasing amount of thread per directory to more then one. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068018#comment-14068018 ] Oleg Golovin commented on KAFKA-1414: - [~junrao] wrote about parallelization over directory in 32.1 (see above). I think RAID is a legitimate case for some Kafka users. So it has to be accounted for too. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068020#comment-14068020 ] Dmitry Bugaychenko commented on KAFKA-1414: --- Separate thread pool per each dir with ability to set number of threads to 1 or higher, as you proposed, could solve both cases and looks reasonably simple. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068021#comment-14068021 ] Jay Kreps commented on KAFKA-1414: -- We should check that having multiple threads accessing the same directory will actually have a negative impact. First, I don't think anyone has actually checked that recovery is actually I/O bound. Recovery iterates over the log doing a bunch of small message reads, decompressing any compressed messages, and checking the CRC of all the data. This may actually be CPU bound. Secondly the OS will do normal readahead which should help protect somewhat against random access for two interspersed linear accesses. This should be easy to test. Run the perf test on a single node broker with multiple partitions on a single drive, then kill -9 it. Then run {code} echo 1 /proc/sys/vm/drop_caches {code} and restart. If we do this twice, once with 1 thread and once with 2. The prediction is that the 2 threaded case will be slower than the 1 thread case, but it may actually not be. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
Improved site docs on how to contribute
Following up on the recent contributor thread I tried to improve the docs on how to contribute to Kafka: http://kafka.apache.org/contributing.html Let me know if anyone has any additional suggestions on what should be there. -Jay
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev3-interleaving.patch Log-level parallelization is better in any case, because it's more flexible. As I mentioned, dir-level parallelization can be achieved on top of log-level parallelization by interleaving jobs of every dir. That will make it possible to load logs from all disks at the same time. On the other hand, I second [~jkreps] on testing whether log loading is CPU or IO bound and measuring a speed of start and shutdown with my patch. After all -- it's not the number of disks which are spinning wildly that matters, but the overall time it takes to start/shutdown the system. (Oh, and if we decide to use interleaved jobs strategy -- here's a [patch|^KAFKA-1414-rev3-interleaving.patch] (: ) Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068047#comment-14068047 ] Anton Karamanov edited comment on KAFKA-1414 at 7/20/14 9:14 PM: - Log-level parallelization is better in any case, because it's more flexible. As I mentioned, dir-level parallelization can be achieved on top of log-level parallelization by interleaving jobs of every dir. That will make it possible to load logs from all disks at the same time. I second [~jkreps] on testing whether log loading is CPU or IO bound and measuring a speed of start and shutdown with my patch first. After all -- it's not the number of disks which are spinning wildly that matters, but the overall time it takes to start/shutdown the system. (Oh, and if we decide to use interleaved jobs strategy -- here's a [patch|^KAFKA-1414-rev3-interleaving.patch] (: ) was (Author: ataraxer): Log-level parallelization is better in any case, because it's more flexible. As I mentioned, dir-level parallelization can be achieved on top of log-level parallelization by interleaving jobs of every dir. That will make it possible to load logs from all disks at the same time. On the other hand, I second [~jkreps] on testing whether log loading is CPU or IO bound and measuring a speed of start and shutdown with my patch. After all -- it's not the number of disks which are spinning wildly that matters, but the overall time it takes to start/shutdown the system. (Oh, and if we decide to use interleaved jobs strategy -- here's a [patch|^KAFKA-1414-rev3-interleaving.patch] (: ) Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23697: Fix KAFKA-1533: Address Jun and Neha's comments
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23697/#review48194 --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/23697/#comment84551 For clarity, could we calculate the second part to a local val and give it a meaningful name? clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/23697/#comment84550 Thinking a bit more about this. There are two possibility after adding the metadata request to sends: (1) The metadata fetch succeeds. In this case Metadata.lastRefreshMs will be updated and this line is not needed. (2) The metadata fetch fails due to a connection issue. In this case, we probably want to fetch the metadata request from another node immediately, instead of backing off. So, it seems that in NetworkClient, we only need to take care of the case when no node is available for metadata. In this case, we should backoff. So, perhaps we should rename metadataLastUpdateAttemptMs to sth like lastNoNodeMs. core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala https://reviews.apache.org/r/23697/#comment84552 Indentation. - Jun Rao On July 18, 2014, 10:45 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23697/ --- (Updated July 18, 2014, 10:45 p.m.) Review request for kafka. Bugs: KAFKA-1533 https://issues.apache.org/jira/browse/KAFKA-1533 Repository: kafka Description --- 1. Add the metadataRefreshAttemptMs in NetworkClient for backing off; 2. Refactor Producer API tests using KafkaTestHarness; 3. Change default backoff time to 100ms for test utils Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java d8f9ce663ee24d2b0852c974136741280c39f8f8 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 4aa5b01d611631db72df47d50bbe30edb8c478db core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 15fd5bcbaf175a0f7d7cf0b142e63f705ca9b6ae core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 34a7db4b4ea2b720476c2b1f22a623a997faffbc core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 194dd70919a5f301d3131c56594e40a0ebb27311 core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23697/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068165#comment-14068165 ] Jun Rao commented on KAFKA-1549: Thanks for the patch. A couple of comments. 1. inReadLock seems like a generally useful util. We probably can add the following to the Utils file where inLock lives, and change all existing usage of readWriteLock to use those utils. def inReadLock[T](lock: ReadWriteLock)(fun: = T): T = { def inWriteLock[T](lock: ReadWriteLock)(fun: = T): T = { 2. Not sure what the purpose of the change in KafkaApis is. Is that to avoid the overhead of an extra readLock? It's better to limit the usage of the lock inside the MetadataCache. dead brokers coming in the TopicMetadataResponse Key: KAFKA-1549 URL: https://issues.apache.org/jira/browse/KAFKA-1549 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Environment: trunk Reporter: nicu marasoiu Assignee: Jun Rao Attachments: KAFKA-1549__dead_brokers_coming_in_the_TopicMetadataResponse_.patch JunRao confirming my observation that brokers are only added to the metadataCache, never removed: The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23705: Patch for KAFKA-1192
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23705/#review48195 --- Thanks for the patch. A couple of comments below. core/src/main/scala/kafka/tools/DumpLogSegments.scala https://reviews.apache.org/r/23705/#comment84553 Shouldn't Decoder be kafka.serializer.Decoder? Also, capitalize custom. core/src/main/scala/kafka/tools/DumpLogSegments.scala https://reviews.apache.org/r/23705/#comment84554 Could we add a decoder for the key too? - Jun Rao On July 19, 2014, 8:28 a.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23705/ --- (Updated July 19, 2014, 8:28 a.m.) Review request for kafka. Bugs: KAFKA-1192 https://issues.apache.org/jira/browse/KAFKA-1192 Repository: kafka Description --- Support given for custom deserialization of messages Diffs - core/src/main/scala/kafka/tools/DumpLogSegments.scala 6daf87b25a48a51aafb7dbe8d0c0371e0ea7501f Diff: https://reviews.apache.org/r/23705/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068173#comment-14068173 ] Jay Kreps commented on KAFKA-1414: -- I did a test of single-threaded recovery on my laptop where most data is in memory and the disk is an SSD. This should remove any I/O bottleneck. I see log recovery working at about 275MB/sec with 100% CPU load on one core. This indicates that on a non-ssd drive (most Kafka machines, I would imagine), I/O would be the bottleneck. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068177#comment-14068177 ] Jay Kreps commented on KAFKA-1414: -- [~ataraxer] I agree that just throwing all the logs at a thread pool is simpler to configure and probably also to implement. To see if that will work we need to get data on the effect of multiple threads running recovery on a single drive will have significant negative perf impacts. Want to try that one out and see? If the impact is minor I think we are probably better off with the simpler strategy, but if it tanks performance we may need to try to be more explicit about the data directories. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1451) Broker stuck due to leader election race
[ https://issues.apache.org/jira/browse/KAFKA-1451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068193#comment-14068193 ] Jun Rao commented on KAFKA-1451: Neha, I am not sure if #1 is need. We can get into elect from two paths (1) from startup or (2) from handleDeleted. If it's from startup, we already register the watcher before calling elect. If it's from handleDeleted, it means that the watcher must have already been registered. So, once in elect, we know the watcher is already registered. So if after we check the existence of the controller node and the controller node goes away immediately afterward, the watcher is guaranteed to be triggered. Broker stuck due to leader election race - Key: KAFKA-1451 URL: https://issues.apache.org/jira/browse/KAFKA-1451 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Maciek Makowski Priority: Minor Labels: newbie h3. Symptoms The broker does not become available due to being stuck in an infinite loop while electing leader. This can be recognised by the following line being repeatedly written to server.log: {code} [2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node [{version:1,brokerid:1,timestamp:1400060079108}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) {code} h3. Steps to Reproduce In a single kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely behave the same with the ZK version included in Kafka distribution) node setup: # start both zookeeper and kafka (in any order) # stop zookeeper # stop kafka # start kafka # start zookeeper h3. Likely Cause {{ZookeeperLeaderElector}} subscribes to data changes on startup, and then triggers an election. if the deletion of ephemeral {{/controller}} node associated with previous zookeeper session of the broker happens after subscription to changes in new session, election will be invoked twice, once from {{startup}} and once from {{handleDataDeleted}}: * {{startup}}: acquire {{controllerLock}} * {{startup}}: subscribe to data changes * zookeeper: delete {{/controller}} since the session that created it timed out * {{handleDataDeleted}}: {{/controller}} was deleted * {{handleDataDeleted}}: wait on {{controllerLock}} * {{startup}}: elect -- writes {{/controller}} * {{startup}}: release {{controllerLock}} * {{handleDataDeleted}}: acquire {{controllerLock}} * {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then gets into infinite loop as a result of conflict {{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing znode was written from different session, which is not true in this case; it was written from the same session. That adds to the confusion. h3. Suggested Fix In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe to data changes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068209#comment-14068209 ] Jun Rao commented on KAFKA-1546: Jay, I think what you proposed is nice and simple. I think it works. One subtlety is dealing with max.wait.ms in the follower fetch request. Imagine that a follower has caught up and its fetch request is sitting in the purgatory. The last caught up time won't be updated for max.wait.ms if no new messages come in. When a message does come in, if max.wait.ms is larger than replica.lag.time.ms, the follower is now considered out of sync. Perhaps we should use the timestamp of when the first message is appended to the leader after the last caught up time. If the amount of time since then is more than replica.lag.time.ms, the replica is considered out of sync. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.2#6252)