[jira] [Created] (KAFKA-1923) Negative offsets in replication-offset-checkpoint file
Oleg Golovin created KAFKA-1923: --- Summary: Negative offsets in replication-offset-checkpoint file Key: KAFKA-1923 URL: https://issues.apache.org/jira/browse/KAFKA-1923 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Oleg Golovin Today was the second time we witnessed negative offsets in replication-offset-checkpoint file. After restart the node stops replicating some of its partitions. Unfortunately we can't reproduce it yet. But the two cases we encountered indicate a bug which should be addressed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1804) Kafka network thread lacks top exception handler
[ https://issues.apache.org/jira/browse/KAFKA-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14246599#comment-14246599 ] Oleg Golovin commented on KAFKA-1804: - `kafka-socket-acceptor` has the same problem. Kafka network thread lacks top exception handler Key: KAFKA-1804 URL: https://issues.apache.org/jira/browse/KAFKA-1804 Project: Kafka Issue Type: Bug Reporter: Oleg Golovin We have faced the problem that some kafka network threads may fail, so that jstack attached to Kafka process showed fewer threads than we had defined in our Kafka configuration. This leads to API requests processed by this thread getting stuck unresponed. There were no error messages in the log regarding thread failure. We have examined Kafka code to find out there is no top try-catch block in the network thread code, which could at least log possible errors. Could you add top-level try-catch block for the network thread, which should recover network thread in case of exception? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1804) Kafka network thread lacks top exception handler
Oleg Golovin created KAFKA-1804: --- Summary: Kafka network thread lacks top exception handler Key: KAFKA-1804 URL: https://issues.apache.org/jira/browse/KAFKA-1804 Project: Kafka Issue Type: Bug Reporter: Oleg Golovin We have faced the problem that some kafka network threads may fail, so that jstack attached to Kafka process showed fewer threads than we had defined in our Kafka configuration. This leads to API requests processed by this thread getting stuck unresponed. There were no error messages in the log regarding thread failure. We have examined Kafka code to find out there is no top try-catch block in the network thread code, which could at least log possible errors. Could you add top-level try-catch block for the network thread, which should recover network thread in case of exception? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1726) Wrong message format description
Oleg Golovin created KAFKA-1726: --- Summary: Wrong message format description Key: KAFKA-1726 URL: https://issues.apache.org/jira/browse/KAFKA-1726 Project: Kafka Issue Type: Bug Components: website Reporter: Oleg Golovin Here [in this page|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata#KafkaEnrichedMessageMetadata-CurrentMessageFormat] you describe current Kafka message format: {code} MessageAndOffset = MessageSize Offset Message MessageSize = int32 Offset = int64 Message = Crc MagicByte Attributes KeyLength Key ValueLength Value Crc = int32 MagicByte = int8 Attributes = int8 KeyLength = int32 Key = bytes ValueLength = int32 Value = bytes {code} In reality _offset_ goes before _messageSize_. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1712) Excessive storage usage on newly added node
[ https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Golovin updated KAFKA-1712: Description: When a new node is added to cluster data starts replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes during `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much more recent than the data in other segments. This is not a big problem until we delete the data which hasn't been fully consumed. In this case we loose data and this makes it a big problem. Is it possible to retain segment mtime during initial replication on a new node? This will help not to load the new node with the twice as large amount of data as other nodes have. Or maybe there are another ways to sort segments by data creation times (or close to data creation time)? (for example if this ticket is implemented https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the first message from .index). In our case it will help with kafka emergency cleaner, which will be deleting really the oldest data. was: When a new node is added to cluster data stars replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes during `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much more recent than the
[jira] [Updated] (KAFKA-1712) Excessive storage usage on newly added node
[ https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Golovin updated KAFKA-1712: Description: When a new node is added to cluster data starts replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes which happened from `t1` to `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much more recent than the data in other segments. This is not a big problem until we delete the data which hasn't been fully consumed. In this case we loose data and this makes it a big problem. Is it possible to retain segment mtime during initial replication on a new node? This will help not to load the new node with the twice as large amount of data as other nodes have. Or maybe there are another ways to sort segments by data creation times (or close to data creation time)? (for example if this ticket is implemented https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the first message from .index). In our case it will help with kafka emergency cleaner, which will be deleting really the oldest data. was: When a new node is added to cluster data starts replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes during `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much
[jira] [Created] (KAFKA-1712) Excessive storage usage on newly added node
Oleg Golovin created KAFKA-1712: --- Summary: Excessive storage usage on newly added node Key: KAFKA-1712 URL: https://issues.apache.org/jira/browse/KAFKA-1712 Project: Kafka Issue Type: Bug Reporter: Oleg Golovin When a new node is added to cluster data stars replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes during `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much more recent than the data in other segments. This is not a big problem until we delete the data which hasn't been fully consumed. In this case we loose data and this makes it a big problem. Is it possible to retain segment mtime during initial replication on a new node? This will help not to load the new node with the twice as large amount of data as other nodes have. Or maybe there are another ways to sort segments by data creation times (or close to data creation time)? (for example if this ticket is implemented https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the first message from .index). In our case it will help with kafka emergency cleaner, which will be deleting really the oldest data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067986#comment-14067986 ] Oleg Golovin commented on KAFKA-1414: - [~ataraxer], you seem to run all the threadpool on the first directory, which only then consecutively move to then next directories. You tried to parallelize IO operation for each directory in case of them being on RAID volumes, but you lost inter-directory parallelization. I think we need to disperse threads equally among directories. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068014#comment-14068014 ] Oleg Golovin commented on KAFKA-1414: - I see only one problem here. If we disperse threads among directories, there will be one directory in the end (presumably the biggest one) where all the threads will go. When it happens, such number of threads will only aggravate performance for spinning drives, as it will results in more disk head seeks (though the OS will try to optimize those reads, I think seeks will not be avoided). Maybe we would rather have a separate thread pool for each directory, with the {{number of threads = log.io.parallelism / number_of_directories}}? Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068018#comment-14068018 ] Oleg Golovin commented on KAFKA-1414: - [~junrao] wrote about parallelization over directory in 32.1 (see above). I think RAID is a legitimate case for some Kafka users. So it has to be accounted for too. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1530) howto update continuously
[ https://issues.apache.org/jira/browse/KAFKA-1530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14064815#comment-14064815 ] Oleg Golovin commented on KAFKA-1530: - Thank you for mentioning the option unclean.leader.election.enable. It seems to be a new option we didn't know of. We will need some time to test it. We will report how it went as soon as we perform this testing. howto update continuously - Key: KAFKA-1530 URL: https://issues.apache.org/jira/browse/KAFKA-1530 Project: Kafka Issue Type: Wish Reporter: Stanislav Gilmulin Assignee: Guozhang Wang Priority: Minor Labels: operating_manual, performance Hi, Could I ask you a question about the Kafka update procedure? Is there a way to update software, which doesn't require service interruption or lead to data losses? We can't stop message brokering during the update as we have a strict SLA. Best regards Stanislav Gilmulin -- This message was sent by Atlassian JIRA (v6.2#6252)