[jira] [Resolved] (KAFKA-1551) Configuration example errors
[ https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1551. -- Resolution: Fixed Fixed. Configuration example errors Key: KAFKA-1551 URL: https://issues.apache.org/jira/browse/KAFKA-1551 Project: Kafka Issue Type: Bug Components: website Reporter: Alexey Ozeritskiy Assignee: Alexey Ozeritskiy A Production Server Config (http://kafka.apache.org/documentation.html#prodconfig) contains error: {code} # ZK configuration zk.connection.timeout.ms=6000 zk.sync.time.ms=2000 {code} Should be {code} # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1552) Quickstart refers to wrong port in pastable commands
[ https://issues.apache.org/jira/browse/KAFKA-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1552: - Assignee: Olle Jonsson Quickstart refers to wrong port in pastable commands Key: KAFKA-1552 URL: https://issues.apache.org/jira/browse/KAFKA-1552 Project: Kafka Issue Type: Bug Components: website Reporter: Olle Jonsson Assignee: Olle Jonsson Labels: documentation, quickstart Attachments: quickstart.patch The file http://svn.apache.org/repos/asf/kafka/site/081/quickstart.html mentions the port localhost:218192 which should be localhost:2181. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1552) Quickstart refers to wrong port in pastable commands
[ https://issues.apache.org/jira/browse/KAFKA-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1552: - Resolution: Fixed Status: Resolved (was: Patch Available) Fixed, thanks! Quickstart refers to wrong port in pastable commands Key: KAFKA-1552 URL: https://issues.apache.org/jira/browse/KAFKA-1552 Project: Kafka Issue Type: Bug Components: website Reporter: Olle Jonsson Assignee: Olle Jonsson Labels: documentation, quickstart Attachments: quickstart.patch The file http://svn.apache.org/repos/asf/kafka/site/081/quickstart.html mentions the port localhost:218192 which should be localhost:2181. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1551) Configuration example errors
[ https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1551: - Assignee: Alexey Ozeritskiy Configuration example errors Key: KAFKA-1551 URL: https://issues.apache.org/jira/browse/KAFKA-1551 Project: Kafka Issue Type: Bug Components: website Reporter: Alexey Ozeritskiy Assignee: Alexey Ozeritskiy A Production Server Config (http://kafka.apache.org/documentation.html#prodconfig) contains error: {code} # ZK configuration zk.connection.timeout.ms=6000 zk.sync.time.ms=2000 {code} Should be {code} # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068021#comment-14068021 ] Jay Kreps commented on KAFKA-1414: -- We should check that having multiple threads accessing the same directory will actually have a negative impact. First, I don't think anyone has actually checked that recovery is actually I/O bound. Recovery iterates over the log doing a bunch of small message reads, decompressing any compressed messages, and checking the CRC of all the data. This may actually be CPU bound. Secondly the OS will do normal readahead which should help protect somewhat against random access for two interspersed linear accesses. This should be easy to test. Run the perf test on a single node broker with multiple partitions on a single drive, then kill -9 it. Then run {code} echo 1 /proc/sys/vm/drop_caches {code} and restart. If we do this twice, once with 1 thread and once with 2. The prediction is that the 2 threaded case will be slower than the 1 thread case, but it may actually not be. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068173#comment-14068173 ] Jay Kreps commented on KAFKA-1414: -- I did a test of single-threaded recovery on my laptop where most data is in memory and the disk is an SSD. This should remove any I/O bottleneck. I see log recovery working at about 275MB/sec with 100% CPU load on one core. This indicates that on a non-ssd drive (most Kafka machines, I would imagine), I/O would be the bottleneck. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14068177#comment-14068177 ] Jay Kreps commented on KAFKA-1414: -- [~ataraxer] I agree that just throwing all the logs at a thread pool is simpler to configure and probably also to implement. To see if that will work we need to get data on the effect of multiple threads running recovery on a single drive will have significant negative perf impacts. Want to try that one out and see? If the impact is minor I think we are probably better off with the simpler strategy, but if it tanks performance we may need to try to be more explicit about the data directories. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066439#comment-14066439 ] Jay Kreps commented on KAFKA-1535: -- To date we have not really used pull requests. Apache has something about JIRA being a way to manage copyright assignment. I think recently they may have started supporting pull requests, so we should probably document and better understand that workflow. But at the moment I think we are still just doing patches and JIRA like it is the last century. :-) return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jay Kreps Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible
[ https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066440#comment-14066440 ] Jay Kreps commented on KAFKA-1150: -- Yeah, [~thecoop1984], can you verify that the patch on that ticket actually fixes the problem you saw? Fetch on a replicated topic does not return as soon as possible --- Key: KAFKA-1150 URL: https://issues.apache.org/jira/browse/KAFKA-1150 Project: Kafka Issue Type: Bug Components: core, replication Affects Versions: 0.8.0 Reporter: Andrey Balmin Assignee: Neha Narkhede Attachments: Test.java I see a huge performance difference between replicated and not replicated topics. On my laptop, running two brokers, I see producer-2-consumer latency of under 1ms for topics with one replica. However, with two replicas the same latency equals to the max fetch delay. Here is a simple test I just did: one producer thread in a loop sending one message and sleeping for 2500ms, and one consumer thread looping on the long poll with max fetch delay of 1000 ms. Here is what happens with no replication: Produced 1 key: key1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:53.823 Consumed up to 1 at time: 15:33:54.825 Produced 2 key: key2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:56.326 Consumed up to 2 at time: 15:33:57.328 Produced 3 key: key3 at time: 15:33:57.827 Consumed up to 3 at time: 15:33:57.827 The are no delays between the message being produced and consumed -- this is the behavior I expected. Here is the same test, but for a topic with two replicas: Consumed up to 0 at time: 15:50:29.575 Produced 1 key: key1 at time: 15:50:29.575 Consumed up to 1 at time: 15:50:30.577 Consumed up to 1 at time: 15:50:31.579 Consumed up to 1 at time: 15:50:32.078 Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 Consumed up to 2 at time: 15:50:34.081 Consumed up to 2 at time: 15:50:34.581 Produced 3 key: key3 at time: 15:50:34.581 Consumed up to 3 at time: 15:50:35.584 Notice how the fetch always returns as soon as the produce request is issued, but without the new message, which consistently arrives ~1002 ms later. Below is the request log snippet for this part: Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 You can see the first FetchRequest returns at the same time as the replica FetchRequest, but this fetch response is *empty* -- the message is not committed yet, so it cannot be returned. The message is committed at 15:50:32,079. However, the next FetchRequest (that does return the message) comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it waiting for the full 1000 ms, instead of returning right away? [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) from client /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1048576) from client /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 from client /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(130,1048576)
[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector
[ https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066461#comment-14066461 ] Jay Kreps commented on KAFKA-1282: -- The goal is just to reduce server connection count. In our environment there might be a single Kafka producer in each process we run publishing to a small Kafka cluster (say ~20 servers). However there are tens of thousands of client processes. Connections can end up going unused when leadership migrates and we should eventually close these out rather than retaining them indefinitely. As you say it is not critical as the server seems to do a good job of dealing with high connection counts, but it seems like a good thing to do. I agree that doing this on the server might be better. This does mean it is possible that the server will attempt to close the socket while the client is attempting to send something. But if the timeout is 10 mins, it is unlikely that this will happen often (i.e. if nothing was sent in the last 10 mins, it will not likely happen in the 0.5 ms it takes to do the close). The advantage of doing it on the server is that it will work for all clients. This change would be in core/.../kafka/network/SocketServer.scala. The only gotcha is that we likely need to avoid iterating over all connections to avoid latency impact (there could be 100k connections). One way to do this would be to use java.util.LinkedHashMap to implement an LRU hash map of the SelectionKeys, and access this every time the selection key comes up in a select operation. (There are a ton of details in LinkedHashMap--needs to be access order, etc). Then every 5-10 select loop iterations we would iterate the map expiring connections until we come to a connection that doesn't need expiring, then stop. Disconnect idle socket connection in Selector - Key: KAFKA-1282 URL: https://issues.apache.org/jira/browse/KAFKA-1282 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie++ Fix For: 0.9.0 To reduce # socket connections, it would be useful for the new producer to close socket connections that are idle. We can introduce a new producer config for the idle time. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066471#comment-14066471 ] Jay Kreps commented on KAFKA-1476: -- This looks good to me. One thing we should think about: does this make sense as a standalone command? We have something of a history of making a new command for every possible thing you could do. Often it is a little more usable if we group these together a bit into logically related items. For example, I could imagine having a command bin/kafka-consumers.sh --list-groups or something like that that also performed other consumer-related commands. This sort of the makes the tools a little more usable for users. [~guozhang], [~toddpalino], [~nehanarkhede] what do you guys think? As we do the consumer co-ordinator what admin operations are we going to want tooling for? How does this relate to the consumer offset checker, should that be combined? Let's think this through and then just name this thing appropriately even if it only has 10% of the functionality we envision at the moment. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066583#comment-14066583 ] Jay Kreps commented on KAFKA-1476: -- Cool, so [~balaji.seshadri] would you be willing to revamp this a bit? Two approaches: 1. Just change the class name to ConsumerCommand and shell script to kafka-consumers.sh even though it only implements the list groups functionality at the moment. We can file a follow-up bug to implement some of the additional features which anyone can pick up. 2. Implement some of the proposed features now on this ticket. Most of them should be pretty straight-forward... Either way is fine. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067074#comment-14067074 ] Jay Kreps commented on KAFKA-1535: -- [~nmarasoi] Totally up to you. return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1543) Changing replication factor
[ https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065616#comment-14065616 ] Jay Kreps commented on KAFKA-1543: -- I wonder if it would make more sense to have the alter topic command do this. Something like: {code} bin/kafka-topics.sh --zookeeper host:port --alter --topic name --replication-factor 3 {code} Changing replication factor --- Key: KAFKA-1543 URL: https://issues.apache.org/jira/browse/KAFKA-1543 Project: Kafka Issue Type: Improvement Reporter: Alexey Ozeritskiy Attachments: can-change-replication.patch It is difficult to change replication factor by manual editing json config. I propose to add a key to kafka-reassign-partitions.sh command to automatically create json config. Example of usage {code} kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate output {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-169) Layering violations in Kafka code
[ https://issues.apache.org/jira/browse/KAFKA-169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-169. - Resolution: Won't Fix Layering violations in Kafka code - Key: KAFKA-169 URL: https://issues.apache.org/jira/browse/KAFKA-169 Project: Kafka Issue Type: Bug Components: core Reporter: Jay Kreps Priority: Minor Attachments: draw_deps.py, kafka_deps.svg I am noticing lot of layering violations creeping into the code. For example the log implementation depends on zookeeper code now, the network server depends on the kafka api, etc. This stuff is messy and makes it hard to test or reason about the pieces in isolation. I have run a quick analysis on the imports to look at problems and there are a few. Let's try to keep this graph in good shape and think about the layering in the code. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065646#comment-14065646 ] Jay Kreps commented on KAFKA-1535: -- Our current model of nodes is that they are permanent. That is if there is a node 1, if it dies, it will come back or be replaced. It need not literally be the same machine, just that if a node dies you will eventually add a new node with id 1 which will take over the work 1 used to do. The metadata response is read by the producer and consumer clients. For example in the new java code it is in clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java. return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065702#comment-14065702 ] Jay Kreps commented on KAFKA-1070: -- Hey [~harsha_ch], that is a great point. I'd like to avoid changing the type of the id to a long or UUID so as to not have to bump up the protocol format for the metadata request which hands these out to clients (we would need a way to handle compatibility with older clients that don't expect the longer types). I think we can get around the problem you point out by just defaulting the node id sequence to 1000. This could theoretically conflict but most people number from 0 or 1 and we can discuss this in the release notes. Our plan will be to release with support for both configured node ids and assigned node ids for compatibility. After a couple of releases we will remove the config. So the behavior would be this: If there is a node id in the config we will validate it against the node id in the data directory If it matches that good, we'll use that. If it doesn't match that is bad, we'll crash with an error. If there is a node id in the data directory but none in the config, we'll use whatever is in the data directory. If there is no node id in the data directory yet but there is one in the config we'll write that to the data directory and use it. If there is neither a node id in the data directory nor in the config we'll allocate a node id and write it to the data directory. Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065721#comment-14065721 ] Jay Kreps commented on KAFKA-1414: -- Hey [~aozeritsky], any before/after perf numbers for your setup? [~ataraxer] How many threads were being used when that out of memory error occurred? My understanding is that that happens when java requests memory from the OS and the OS is physically out of memory and not willing to give virtual memory. Can you confirm that this is a reproducible thing? If so we may need to kind of warn people about that...however it is somewhat counterintuitive that on a machine with sufficient memory calling flush, say, from 4 threads would crash the process. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1535: - Resolution: Fixed Assignee: Jay Kreps Status: Resolved (was: Patch Available) Patch looks good to me! return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jay Kreps Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-183) Expose offset vector to the consumer
[ https://issues.apache.org/jira/browse/KAFKA-183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-183. - Resolution: Fixed This is being done in the new consumer. Expose offset vector to the consumer Key: KAFKA-183 URL: https://issues.apache.org/jira/browse/KAFKA-183 Project: Kafka Issue Type: Sub-task Reporter: Jay Kreps Assignee: Jay Kreps We should enable consumers to save their position themselves. This would be useful for consumers that need to store consumed data so they can store the data and the position together, this gives a poor man's transactionality since any data loss on the consumer will also rewind the position to the previous position so the two are always in sync. Two ways to do this: 1. Add an OffsetStorage interface and have the zk storage implement this. The user can override this by providing an OffsetStorage implementation of their own to change how values are stored. 2. Make commit() return the position offset vector and add a setPosition(ListLong) method to initialize the position. Let's figure out any potential problems with this, and work out the best approach. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065806#comment-14065806 ] Jay Kreps commented on KAFKA-1546: -- I think this is actually a really important thing to get right to make replication reliable. There are some subtleties. It would be good to work out the basics of how this could work on this JIRA. For example the throughput on a partition might be 1 msg/sec. But that is because only 1 msg/sec is being written by the producer. However if someone writes a batch of 1000 messages, that doesn't mean we are necessarily 1000 seconds behind. We already track the time since the last fetch request. So if the fetcher stops entirely for too long it will be caught. I think the other condition we want to be able to catch is one where the fetcher is still fetching but it is behind and likely won't catch up. One way to make caught-up concrete is to say that the last fetch went to the end of the log. We potentially reduce this to one config and just have replica.lag.time.ms which would both be the maximum time since a fetch or the maximum amount of time without catching up to the leader. The implementation would be that every time a fetch didn't go to the logEndOffset we would set the lag clock and it would only reset when a fetch request finally went all the way to the logEndOffset. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1512. -- Resolution: Fixed Committed. Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch, KAFKA-1512_2014-07-03_15:17:55.patch, KAFKA-1512_2014-07-14_13:28:15.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14063742#comment-14063742 ] Jay Kreps commented on KAFKA-1414: -- Great patch! I second Jun's comments. Is there a way we can make this a little better out-of-the-box? Our experience is that people don't set every config optimally out of the box. So a couple ideas: It may be that having more threads than disks doesn't hurt as the I/O scheduler can figure it out, in which case we could default to something higher than 1. Is there really a need for two configurations, one for recovery and one for shutdown? It may be that the ideal setting is always just the number of disks. Or it may be that the recovery is actually CPU bound whereas flush isn't. If the right answer is just the number of disks then we may be able to simplify things by just asking for num.disks or io.parallelism or something like that and we set the number of threads appropriately for these cases. This would let us default num.disks to the number of data directories, but would let users with RAID arrays or other logical volume mgmt things override this. It would also allow us to use this config for other things related to I/O parallelism as they arise without creating to many configs. This is obviously a minor thing but a little research here would give a nicer user experience. Also, if you have a chance to run a basic before/after test on the speedup on a machine with multiple disks that would be great (presumably it is a linear speedup). This would let us report the improvement in the release in concrete terms as well as validating that it actually works as we expect. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14064186#comment-14064186 ] Jay Kreps commented on KAFKA-1535: -- Hey Nicu, this looks good to me. Did you do any testing on this? return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse1.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062214#comment-14062214 ] Jay Kreps commented on KAFKA-1535: -- Yeah we do all development on trunk and just cut branches as a stable point for critical point fixes needed after the release. return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1512: - Attachment: KAFKA-1512_2014-07-14_13:28:15.patch Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch, KAFKA-1512_2014-07-03_15:17:55.patch, KAFKA-1512_2014-07-14_13:28:15.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061178#comment-14061178 ] Jay Kreps commented on KAFKA-1512: -- Updated reviewboard https://reviews.apache.org/r/23208/ against branch trunk Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch, KAFKA-1512_2014-07-03_15:17:55.patch, KAFKA-1512_2014-07-14_13:28:15.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1258) Delete temporary data directory after unit test finishes
[ https://issues.apache.org/jira/browse/KAFKA-1258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1258: - Resolution: Fixed Status: Resolved (was: Patch Available) Committed. Thanks! Delete temporary data directory after unit test finishes Key: KAFKA-1258 URL: https://issues.apache.org/jira/browse/KAFKA-1258 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Manikumar Reddy Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1258.patch Today in unit testsuite most of the time when a test case is setup a temporary directory will be created with a random int as suffix, and will not be deleted after the test. After a few unit tests this will create tons of directories in java.io.tmpdir (/tmp for Linux). Would be better to remove them for clean unit tests. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1330) Implement subscribe(TopicPartition...partitions) in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059904#comment-14059904 ] Jay Kreps commented on KAFKA-1330: -- Created reviewboard https://reviews.apache.org/r/23442/ against branch trunk Implement subscribe(TopicPartition...partitions) in the new consumer Key: KAFKA-1330 URL: https://issues.apache.org/jira/browse/KAFKA-1330 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Attachments: KAFKA-1330.patch This involves adding basic fetch functionality (equivalent to SimpleConsumer) to the new consumer. Effectively implementing subscribe(TopicPartition...partitions) and unsubscribe(TopicPartition...partitions). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1330) Implement subscribe(TopicPartition...partitions) in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1330: - Attachment: KAFKA-1330.patch Implement subscribe(TopicPartition...partitions) in the new consumer Key: KAFKA-1330 URL: https://issues.apache.org/jira/browse/KAFKA-1330 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Attachments: KAFKA-1330.patch This involves adding basic fetch functionality (equivalent to SimpleConsumer) to the new consumer. Effectively implementing subscribe(TopicPartition...partitions) and unsubscribe(TopicPartition...partitions). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1330) Implement subscribe(TopicPartition...partitions) in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059905#comment-14059905 ] Jay Kreps commented on KAFKA-1330: -- Hey [~nmarasoi], sorry had actually been working on this for the last week...I should have assigned it to myself. Implement subscribe(TopicPartition...partitions) in the new consumer Key: KAFKA-1330 URL: https://issues.apache.org/jira/browse/KAFKA-1330 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Jay Kreps Attachments: KAFKA-1330.patch This involves adding basic fetch functionality (equivalent to SimpleConsumer) to the new consumer. Effectively implementing subscribe(TopicPartition...partitions) and unsubscribe(TopicPartition...partitions). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1330) Implement subscribe(TopicPartition...partitions) in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059906#comment-14059906 ] Jay Kreps commented on KAFKA-1330: -- Guys, I posted a very rough draft that implements simple consumer functionality. I think implementing offset maintainence on top of this will be pretty easy and can be done once we have those new request defn's done. I'd like to start getting these changes in even though they aren't done to avoid keeping large patches against trunk. Specifically what I am hoping is that we can do a detailed review on the changes to common code, esp with respect to NetworkClient and be a bit lax on KafkaConsumer until we get something more complete. Implement subscribe(TopicPartition...partitions) in the new consumer Key: KAFKA-1330 URL: https://issues.apache.org/jira/browse/KAFKA-1330 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Jay Kreps Attachments: KAFKA-1330.patch This involves adding basic fetch functionality (equivalent to SimpleConsumer) to the new consumer. Effectively implementing subscribe(TopicPartition...partitions) and unsubscribe(TopicPartition...partitions). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1026) Dynamically Adjust Batch Size Upon Receiving MessageSizeTooLargeException
[ https://issues.apache.org/jira/browse/KAFKA-1026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059044#comment-14059044 ] Jay Kreps commented on KAFKA-1026: -- I believe this is fixed in the new producer, no? We will allocate up to the maximum memory size for messages larger than the batch size. Dynamically Adjust Batch Size Upon Receiving MessageSizeTooLargeException - Key: KAFKA-1026 URL: https://issues.apache.org/jira/browse/KAFKA-1026 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Labels: newbie++ Fix For: 0.9.0 Among the exceptions that can possibly received in Producer.send(), MessageSizeTooLargeException is currently not recoverable since the producer does not change the batch size but still retries on sending. It is better to have a dynamic batch size adjustment mechanism based on MessageSizeTooLargeException. This is related to KAFKA-998 -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1026) Dynamically Adjust Batch Size Upon Receiving MessageSizeTooLargeException
[ https://issues.apache.org/jira/browse/KAFKA-1026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1026. -- Resolution: Fixed Fix Version/s: (was: 0.9.0) 0.8.2 Cool, closing with fix as 0.8.2. Dynamically Adjust Batch Size Upon Receiving MessageSizeTooLargeException - Key: KAFKA-1026 URL: https://issues.apache.org/jira/browse/KAFKA-1026 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Labels: newbie++ Fix For: 0.8.2 Among the exceptions that can possibly received in Producer.send(), MessageSizeTooLargeException is currently not recoverable since the producer does not change the batch size but still retries on sending. It is better to have a dynamic batch size adjustment mechanism based on MessageSizeTooLargeException. This is related to KAFKA-998 -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1040) ConsumerConfig and ProducerConfig do work in the Constructor
[ https://issues.apache.org/jira/browse/KAFKA-1040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059095#comment-14059095 ] Jay Kreps commented on KAFKA-1040: -- Hey guys, I'm not sure I agree with this principle. The goal of a constructor is to construct a valid instance of the object. So validating the constructor arguments should totally be done in the constructor. Could you give a concrete example of how adding a separate validate() method would make things better? ConsumerConfig and ProducerConfig do work in the Constructor -- Key: KAFKA-1040 URL: https://issues.apache.org/jira/browse/KAFKA-1040 Project: Kafka Issue Type: Improvement Components: config, consumer, producer Affects Versions: 0.8.0 Environment: Java 1.7 Linux Mint 14 (64bit) Reporter: Sharmarke Aden Assignee: Neha Narkhede Priority: Minor Labels: config, newbie Fix For: 0.9.0 It appears that validation of configuration properties is performed in the ConsumerConfig and ProducerConfig constructors. This is generally bad practice as it couples object construction and validation. It also makes it difficult to mock these objects in unit tests. Ideally validation of the configuration properties should be separated from object construction and initiated by those that rely/use these config objects. http://misko.hevery.com/code-reviewers-guide/flaw-constructor-does-real-work/ -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1040) ConsumerConfig and ProducerConfig do work in the Constructor
[ https://issues.apache.org/jira/browse/KAFKA-1040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059175#comment-14059175 ] Jay Kreps commented on KAFKA-1040: -- Not sure I get the analogy, can we be concrete about the problem this solves. Is this meant to help the end user of Kafka or the Kafka developers or both? What problem do either of these two people currently have that this change would fix? You mentioned mocking, but these classes are internal and we provide a mock of the client for the end user... ConsumerConfig and ProducerConfig do work in the Constructor -- Key: KAFKA-1040 URL: https://issues.apache.org/jira/browse/KAFKA-1040 Project: Kafka Issue Type: Improvement Components: config, consumer, producer Affects Versions: 0.8.0 Environment: Java 1.7 Linux Mint 14 (64bit) Reporter: Sharmarke Aden Assignee: Neha Narkhede Priority: Minor Labels: config, newbie Fix For: 0.9.0 It appears that validation of configuration properties is performed in the ConsumerConfig and ProducerConfig constructors. This is generally bad practice as it couples object construction and validation. It also makes it difficult to mock these objects in unit tests. Ideally validation of the configuration properties should be separated from object construction and initiated by those that rely/use these config objects. http://misko.hevery.com/code-reviewers-guide/flaw-constructor-does-real-work/ -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059200#comment-14059200 ] Jay Kreps commented on KAFKA-1535: -- Cool, go for it. Just attach a patch to this issue and we will review and apply it. return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1515) Wake-up Sender upon blocked on fetching leader metadata
[ https://issues.apache.org/jira/browse/KAFKA-1515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059269#comment-14059269 ] Jay Kreps commented on KAFKA-1515: -- I am getting transient hangs after ProducerFailureTest.testSendAfterClose, but I will assume that is due to KAFKA-1533. Wake-up Sender upon blocked on fetching leader metadata --- Key: KAFKA-1515 URL: https://issues.apache.org/jira/browse/KAFKA-1515 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1515.patch, KAFKA-1515_2014-07-03_10:19:28.patch, KAFKA-1515_2014-07-03_16:43:05.patch, KAFKA-1515_2014-07-07_10:55:58.patch, KAFKA-1515_2014-07-08_11:35:59.patch Currently the new KafkaProducer will not wake up the sender thread upon forcing metadata fetch, and hence if the sender is polling with a long timeout (e.g. the metadata.age period) this wait will usually timeout and fail. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1515) Wake-up Sender upon blocked on fetching leader metadata
[ https://issues.apache.org/jira/browse/KAFKA-1515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059286#comment-14059286 ] Jay Kreps commented on KAFKA-1515: -- Committed with a very minor change: I renamed the isReadyToSend method to isSendable simply because we had like four methods that were ready/isReady/etc and each meant something slightly different. This hopefully differentiates better: sendable is when you are connected and have room to send, ready is when you are sendable and there is nothing else going on that makes us want to block the request. Wake-up Sender upon blocked on fetching leader metadata --- Key: KAFKA-1515 URL: https://issues.apache.org/jira/browse/KAFKA-1515 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1515.patch, KAFKA-1515_2014-07-03_10:19:28.patch, KAFKA-1515_2014-07-03_16:43:05.patch, KAFKA-1515_2014-07-07_10:55:58.patch, KAFKA-1515_2014-07-08_11:35:59.patch Currently the new KafkaProducer will not wake up the sender thread upon forcing metadata fetch, and hence if the sender is polling with a long timeout (e.g. the metadata.age period) this wait will usually timeout and fail. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059287#comment-14059287 ] Jay Kreps commented on KAFKA-1535: -- Yes I think that is correct. The current behavior is basically an optimization to avoid sending all the nodes. However this means if you are only sending to a single partition you may only have the host/port info for one machine, if that machine goes down you don't have another broker to go to. Since there shouldn't be more than a few hundred brokers, sending the full host/port for all of them should be okay and fixes this scenario. return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1537) add request.required.acks=-2 to require acks from all replicas
[ https://issues.apache.org/jira/browse/KAFKA-1537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14059451#comment-14059451 ] Jay Kreps commented on KAFKA-1537: -- Wouldn't it be somewhat odd that you actually want to require N/N replicas to acknowledge? It seems more likely that if you have replication factor 3 you would want 2 acks, which we already support today. Otherwise you have no fault-tolerance. add request.required.acks=-2 to require acks from all replicas -- Key: KAFKA-1537 URL: https://issues.apache.org/jira/browse/KAFKA-1537 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1.1 Reporter: Jiang Wu Priority: Minor The current options for the producer parameter request.required.acks include 0, positive numbers and -1. -1 means the acks from replicas in ISR. In a stress test, it's found that, if request.required.acks=-1 and the leader is receiving at high speed, then the followers will fail out of ISR. In this case, request.required.acks=-1 is equivelant to request.required.acks=1 because only the leader is in ISR. It would be desirable to add request.required.acks=-2 to require acks from all replicas. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1325) Fix inconsistent per topic log configs
[ https://issues.apache.org/jira/browse/KAFKA-1325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14057521#comment-14057521 ] Jay Kreps commented on KAFKA-1325: -- Regardless of which path we go let's think about compatibility. If I currently have a bunch of configs in zookeeper, how do they get changed if we rename them? Fix inconsistent per topic log configs -- Key: KAFKA-1325 URL: https://issues.apache.org/jira/browse/KAFKA-1325 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1 Reporter: Neha Narkhede Labels: usability Attachments: KAFKA-1325.patch, KAFKA-1325.patch Related thread from the user mailing list - http://mail-archives.apache.org/mod_mbox/kafka-users/201403.mbox/%3Cd8f6f3857b82c4ccd8725aba6fd68cb8%40cweb01.nmdf.nhnsystem.com%3E Our documentation is a little confusing on the log configs. The log property for retention.ms is in millis but the server default it maps to is in minutes. Same is true for segment.ms as well. We could either improve the docs or change the per-topic configs to be consistent with the server defaults. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1515) Wake-up Sender upon blocked on fetching leader metadata
[ https://issues.apache.org/jira/browse/KAFKA-1515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1515. -- Resolution: Fixed Wake-up Sender upon blocked on fetching leader metadata --- Key: KAFKA-1515 URL: https://issues.apache.org/jira/browse/KAFKA-1515 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1515_2014-07-03_10:19:28.patch, KAFKA-1515_2014-07-03_16:43:05.patch, KAFKA-1515_2014-07-07_10:55:58.patch, KAFKA-1515_2014-07-08_11:35:59.patch Currently the new KafkaProducer will not wake up the sender thread upon forcing metadata fetch, and hence if the sender is polling with a long timeout (e.g. the metadata.age period) this wait will usually timeout and fail. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload
[ https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056441#comment-14056441 ] Jay Kreps commented on KAFKA-1532: -- This is a good idea, but unfortunately would break compatibility with existing clients. It would be good to save up enough of these fixes and do them all at once to minimize breakage. Move CRC32 to AFTER the payload --- Key: KAFKA-1532 URL: https://issues.apache.org/jira/browse/KAFKA-1532 Project: Kafka Issue Type: Improvement Components: core, producer Reporter: Julian Morrison Assignee: Jun Rao Priority: Minor To support streaming a message of known length but unknown content, take the CRC32 out of the message header and make it a message trailer. Then client libraries can calculate it after streaming the message to Kafka, without materializing the whole message in RAM. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload
[ https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056444#comment-14056444 ] Jay Kreps commented on KAFKA-1532: -- Also since Kafka itself fully materializes messages in memory, fixing this won't actually let clients send arbitrarily large messages, as the broker will still choke. Move CRC32 to AFTER the payload --- Key: KAFKA-1532 URL: https://issues.apache.org/jira/browse/KAFKA-1532 Project: Kafka Issue Type: Improvement Components: core, producer Reporter: Julian Morrison Assignee: Jun Rao Priority: Minor To support streaming a message of known length but unknown content, take the CRC32 out of the message header and make it a message trailer. Then client libraries can calculate it after streaming the message to Kafka, without materializing the whole message in RAM. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload
[ https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056802#comment-14056802 ] Jay Kreps commented on KAFKA-1532: -- It's not unavoidable, but it is pretty hard. To make this really work we have to be able to stream partial requests from the network layer, to the api/processing layer, and to the log layer. This is even more complicated because additional appends to the log have to be blocked until the message is completely written. Move CRC32 to AFTER the payload --- Key: KAFKA-1532 URL: https://issues.apache.org/jira/browse/KAFKA-1532 Project: Kafka Issue Type: Improvement Components: core, producer Reporter: Julian Morrison Assignee: Jun Rao Priority: Minor To support streaming a message of known length but unknown content, take the CRC32 out of the message header and make it a message trailer. Then client libraries can calculate it after streaming the message to Kafka, without materializing the whole message in RAM. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload
[ https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056806#comment-14056806 ] Jay Kreps commented on KAFKA-1532: -- Your approach where you buffer the message with a backing file and then do the full write to the log would also work and might be easier. But still a pretty big change. Move CRC32 to AFTER the payload --- Key: KAFKA-1532 URL: https://issues.apache.org/jira/browse/KAFKA-1532 Project: Kafka Issue Type: Improvement Components: core, producer Reporter: Julian Morrison Assignee: Jun Rao Priority: Minor To support streaming a message of known length but unknown content, take the CRC32 out of the message header and make it a message trailer. Then client libraries can calculate it after streaming the message to Kafka, without materializing the whole message in RAM. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1532) Move CRC32 to AFTER the payload
[ https://issues.apache.org/jira/browse/KAFKA-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056833#comment-14056833 ] Jay Kreps commented on KAFKA-1532: -- Yeah agreed. I think that is doable. Move CRC32 to AFTER the payload --- Key: KAFKA-1532 URL: https://issues.apache.org/jira/browse/KAFKA-1532 Project: Kafka Issue Type: Improvement Components: core, producer Reporter: Julian Morrison Assignee: Jun Rao Priority: Minor To support streaming a message of known length but unknown content, take the CRC32 out of the message header and make it a message trailer. Then client libraries can calculate it after streaming the message to Kafka, without materializing the whole message in RAM. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051899#comment-14051899 ] Jay Kreps commented on KAFKA-1512: -- A proposal from the LI ops team is to also add an override for this so you can have custom limits for ips if you want max.connections.per.ip.overrides=192.168.1.1:5, 192.168.1.2:, 192.168.1.3:45 If no objections I will implement this too. Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051899#comment-14051899 ] Jay Kreps edited comment on KAFKA-1512 at 7/3/14 8:46 PM: -- A proposal from the LI ops team is to also add an override for this so you can have custom limits for ips if you want: {code} max.connections.per.ip.overrides=192.168.1.1:5, 192.168.1.2:, 192.168.1.3:45 {code} If no objections I will implement this too. was (Author: jkreps): A proposal from the LI ops team is to also add an override for this so you can have custom limits for ips if you want max.connections.per.ip.overrides=192.168.1.1:5, 192.168.1.2:, 192.168.1.3:45 If no objections I will implement this too. Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1519) Console consumer: expose configuration option to enable/disable writing the line separator
[ https://issues.apache.org/jira/browse/KAFKA-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051909#comment-14051909 ] Jay Kreps commented on KAFKA-1519: -- I would also vote for supporting an empty separator using the existing argument. It would be great to add kafka.utils.CommandLineUtilsTest with a test to cover that method. I suspect that there would not be any backwards compatibility issues since that argument is not currently accepted and it would be unlikely anyone would depend on the tool rejecting that argument. Console consumer: expose configuration option to enable/disable writing the line separator -- Key: KAFKA-1519 URL: https://issues.apache.org/jira/browse/KAFKA-1519 Project: Kafka Issue Type: Improvement Components: consumer Affects Versions: 0.8.1.1 Reporter: Michael Noll Assignee: Neha Narkhede Priority: Minor The current console consumer includes a {{DefaultMessageFormatter}}, which exposes a few user-configurable options which can be set on the command line via --property, e.g. --property line.separator=XYZ. Unfortunately, the current implementation does not allow the user to completely disable writing any such line separator. However, this functionality would be helpful to enable users to capture data as is from a Kafka topic to snapshot file. Capturing data as is -- without an artificial line separator -- is particularly nice for data in a binary format (including Avro). *No workaround* A potential workaround would be to pass an empty string as the property value of line.separator, but this doesn't work in the current implementation. The following variants throw an Invalid parser arguments exception: {code} --property line.separator= # nothing --property line.separator= # double quotes --property line.separator='' # single quotes {code} Escape tricks via a backslash don't work either. If there actually is a workaround please let me know. *How to fix* We can introduce a print.line option to enable/disable writing line.separator similar to how the code already uses print.key to enable/disable writing key.separator. This change is trivial. To preserve backwards compatibility, the print.line option would be set to true by default (unlike the print.key option, which defaults to false). *Alternatives* Apart from modifying the built-in {{DefaultMessageFormatter}}, users could of course implement their own custom {{MessageFormatter}}. But given that it's a) a trivial change to the {{DefaultMessageFormatter}} and b) a nice user feature I'd say changing the built-in {{DefaultMessageFormatter}} would be the better approach. This way, Kafka would support writing data as-is to a file out of the box. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051951#comment-14051951 ] Jay Kreps commented on KAFKA-1512: -- Yes, I hadn't thought of that. Disabling connections could potentially be useful. The intended use was actually the other way around, basically default most things to something reasonable like 10 but have a way to whitelist some IPs to have unlimited connections. The background here is that we were previously having clients bootstrap metadata through a VIP (which appears to the kafka nodes as a single ip). We just had an issue where a 200 node cluster that uses Kafka started creating and leaking connections through the vip which brought down a big shared cluster. So we thought we should have some limits. The hope was to change the VIP to DNS round-robin and gradually migrate the clients to that. In the meantime we thought it would be useful to be able to enforce the limit but whitelist the VIP with unlimited connections. Thinking about this, maybe it is a little crazy hard coding ip/host names in config? Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051973#comment-14051973 ] Jay Kreps commented on KAFKA-1512: -- Updated reviewboard https://reviews.apache.org/r/23208/ against branch trunk Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch, KAFKA-1512_2014-07-03_15:17:55.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1512: - Attachment: KAFKA-1512_2014-07-03_15:17:55.patch Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch, KAFKA-1512_2014-07-03_15:17:55.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1519) Console consumer: expose configuration option to enable/disable writing the line separator
[ https://issues.apache.org/jira/browse/KAFKA-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052113#comment-14052113 ] Jay Kreps commented on KAFKA-1519: -- Looks good. I think this change might also interpret --property line.seperator as the same as --property line.seperator= Not sure if that is good or confusing... Console consumer: expose configuration option to enable/disable writing the line separator -- Key: KAFKA-1519 URL: https://issues.apache.org/jira/browse/KAFKA-1519 Project: Kafka Issue Type: Improvement Components: consumer Affects Versions: 0.8.1.1 Reporter: Michael Noll Assignee: Neha Narkhede Priority: Minor Attachments: KAFKA-1519.patch The current console consumer includes a {{DefaultMessageFormatter}}, which exposes a few user-configurable options which can be set on the command line via --property, e.g. --property line.separator=XYZ. Unfortunately, the current implementation does not allow the user to completely disable writing any such line separator. However, this functionality would be helpful to enable users to capture data as is from a Kafka topic to snapshot file. Capturing data as is -- without an artificial line separator -- is particularly nice for data in a binary format (including Avro). *No workaround* A potential workaround would be to pass an empty string as the property value of line.separator, but this doesn't work in the current implementation. The following variants throw an Invalid parser arguments exception: {code} --property line.separator= # nothing --property line.separator= # double quotes --property line.separator='' # single quotes {code} Escape tricks via a backslash don't work either. If there actually is a workaround please let me know. *How to fix* We can introduce a print.line option to enable/disable writing line.separator similar to how the code already uses print.key to enable/disable writing key.separator. This change is trivial. To preserve backwards compatibility, the print.line option would be set to true by default (unlike the print.key option, which defaults to false). *Alternatives* Apart from modifying the built-in {{DefaultMessageFormatter}}, users could of course implement their own custom {{MessageFormatter}}. But given that it's a) a trivial change to the {{DefaultMessageFormatter}} and b) a nice user feature I'd say changing the built-in {{DefaultMessageFormatter}} would be the better approach. This way, Kafka would support writing data as-is to a file out of the box. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14049117#comment-14049117 ] Jay Kreps commented on KAFKA-1512: -- A couple things to review: 0. The new config is max.connections.per.ip 1. I am using Socket.getInetAddress() as the key to limit on. I think an InetAddress is what we want...a socket address includes the port so is always unique, but there is sort of a weird hierarchy of things there. This also depends on this address being properly hashable (which it seems to be). 2. I made an unrelated change to how we set the recv buffer. We were weirdly setting this over and over again on the server socket every time we accepted a connection. I think this was a mistake, so I changed it to set it once. But if anyone knows a reason for this odd code that would make me more confident. 3. I don't know of a way to check the source address of a pending connection without actually accepting the connection. So as a result this patch accepts the connection, and then, if we are over quota, closes it. Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1512: - Attachment: KAFKA-1512.patch Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14049235#comment-14049235 ] Jay Kreps commented on KAFKA-1512: -- Created reviewboard https://reviews.apache.org/r/23208/ against branch trunk Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch, KAFKA-1512.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1512) Limit the maximum number of connections per ip address
Jay Kreps created KAFKA-1512: Summary: Limit the maximum number of connections per ip address Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1512) Limit the maximum number of connections per ip address
[ https://issues.apache.org/jira/browse/KAFKA-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1512: - Attachment: KAFKA-1512.patch Limit the maximum number of connections per ip address -- Key: KAFKA-1512 URL: https://issues.apache.org/jira/browse/KAFKA-1512 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1512.patch To protect against client connection leaks add a new configuration max.connections.per.ip that causes the SocketServer to enforce a limit on the maximum number of connections from each InetAddress instance. For backwards compatibility this will default to 2 billion. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14037878#comment-14037878 ] Jay Kreps commented on KAFKA-1316: -- Updated reviewboard https://reviews.apache.org/r/22762/ against branch trunk Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch, KAFKA-1316_2014-06-19_14:01:04.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1316: - Attachment: KAFKA-1316_2014-06-19_14:01:04.patch Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch, KAFKA-1316_2014-06-19_14:01:04.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1291) Make wrapper shell scripts for important tools
[ https://issues.apache.org/jira/browse/KAFKA-1291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1291: - Resolution: Fixed Status: Resolved (was: Patch Available) [~nehanarkhede] I fixed the simple perf test issue you pointed out. I think I just confused myself that you had +1'd the patch by looking at the wrong issue and I just committed it. If you have any additional comments let me know and I will follow up. Make wrapper shell scripts for important tools -- Key: KAFKA-1291 URL: https://issues.apache.org/jira/browse/KAFKA-1291 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Jay Kreps Labels: newbie, usability Fix For: 0.8.2 Attachments: KAFKA-1291.patch, KAFKA-1291.patch, KAFKA-1291.patch It is nice to have a proper command for the important tools just to help with discoverability. I noticed that mirror maker doesn't have such a wrapper. Neither does consumer offset checker. It would be good to do an audit and think of any tools that should have a wrapper that don't. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14035911#comment-14035911 ] Jay Kreps commented on KAFKA-1316: -- Ack, this is dumb bug. Thanks! Will patch. Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1291) Make wrapper shell scripts for important tools
[ https://issues.apache.org/jira/browse/KAFKA-1291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036034#comment-14036034 ] Jay Kreps commented on KAFKA-1291: -- Created reviewboard https://reviews.apache.org/r/22744/ against branch trunk Make wrapper shell scripts for important tools -- Key: KAFKA-1291 URL: https://issues.apache.org/jira/browse/KAFKA-1291 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Jay Kreps Labels: newbie, usability Fix For: 0.8.2 Attachments: KAFKA-1291.patch, KAFKA-1291.patch, KAFKA-1291.patch It is nice to have a proper command for the important tools just to help with discoverability. I noticed that mirror maker doesn't have such a wrapper. Neither does consumer offset checker. It would be good to do an audit and think of any tools that should have a wrapper that don't. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1291) Make wrapper shell scripts for important tools
[ https://issues.apache.org/jira/browse/KAFKA-1291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1291: - Attachment: KAFKA-1291.patch Make wrapper shell scripts for important tools -- Key: KAFKA-1291 URL: https://issues.apache.org/jira/browse/KAFKA-1291 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Jay Kreps Labels: newbie, usability Fix For: 0.8.2 Attachments: KAFKA-1291.patch, KAFKA-1291.patch, KAFKA-1291.patch It is nice to have a proper command for the important tools just to help with discoverability. I noticed that mirror maker doesn't have such a wrapper. Neither does consumer offset checker. It would be good to do an audit and think of any tools that should have a wrapper that don't. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1291) Make wrapper shell scripts for important tools
[ https://issues.apache.org/jira/browse/KAFKA-1291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036041#comment-14036041 ] Jay Kreps commented on KAFKA-1291: -- Hey [~sgeller] this is great! I updated this patch a bit: 1. I removed the shell script for a couple of tools that seemed kind of esoteric and seemed more likely to add confusion. 2. I made all commands print a one-line explanation of what they do when run without arguments. Let me know if you think this description seems reasonable. 3. I refactored a little bit of the cut-and-paste in our command line args definition. This should be an easy patch to review as most changes are trivial. Make wrapper shell scripts for important tools -- Key: KAFKA-1291 URL: https://issues.apache.org/jira/browse/KAFKA-1291 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Jay Kreps Labels: newbie, usability Fix For: 0.8.2 Attachments: KAFKA-1291.patch, KAFKA-1291.patch, KAFKA-1291.patch It is nice to have a proper command for the important tools just to help with discoverability. I noticed that mirror maker doesn't have such a wrapper. Neither does consumer offset checker. It would be good to do an audit and think of any tools that should have a wrapper that don't. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036582#comment-14036582 ] Jay Kreps commented on KAFKA-1316: -- Created reviewboard https://reviews.apache.org/r/22762/ against branch trunk Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1316: - Attachment: KAFKA-1316.patch Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1316. -- Resolution: Fixed Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1489) Global threshold on data retention size
[ https://issues.apache.org/jira/browse/KAFKA-1489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14028184#comment-14028184 ] Jay Kreps commented on KAFKA-1489: -- Go for it! One slight oddity to consider is this. Different nodes will have different partitions. So the amount of data retained for different replicas of the same partition may vary quite a lot. A replica on a node with lots of data will retain little, and one on a more empty broker will retain lots. The current per-partition retention strategies are only approximately the same across nodes as well, but this will potentially be much more extreme. In fact, in steady state any partition movement will simultaneously cause data to get purged to free up space. I don't think this is necessarily a problem but we will need to warn people. Global threshold on data retention size --- Key: KAFKA-1489 URL: https://issues.apache.org/jira/browse/KAFKA-1489 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1.1 Reporter: Andras Sereny Assignee: Jay Kreps Labels: newbie Currently, Kafka has per topic settings to control the size of one single log (log.retention.bytes). With lots of topics of different volume and as they grow in number, it could become tedious to maintain topic level settings applying to a single log. Often, a chunk of disk space is dedicated to Kafka that hosts all logs stored, so it'd make sense to have a configurable threshold to control how much space *all* data in Kafka can take up. See also: http://mail-archives.apache.org/mod_mbox/kafka-users/201406.mbox/browser http://mail-archives.apache.org/mod_mbox/kafka-users/201311.mbox/%3c20131107015125.gc9...@jkoshy-ld.linkedin.biz%3E -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027273#comment-14027273 ] Jay Kreps commented on KAFKA-1316: -- Created reviewboard https://reviews.apache.org/r/22449/ against branch trunk Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1316: - Attachment: KAFKA-1316.patch Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027276#comment-14027276 ] Jay Kreps commented on KAFKA-1316: -- Uploaded patch that has those new APIs. Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14025718#comment-14025718 ] Jay Kreps commented on KAFKA-1316: -- Thanks for the feedback Neha: 1. Makes sense, we can definitely add that. 2. This is okay, though, right? The config for the two should be independent even if some are similar, right? 3. Hmm, not so sure about this. That finds a target for the cluster metadata request. The logic might be the same as for another type of metadata request (e.g. to discover the co-ordinator) but it also might not. I would feel concerned about exposing it unless we can come up with a very general description of what it is doing... Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14025757#comment-14025757 ] Jay Kreps commented on KAFKA-1316: -- 2. Sure. I guess I don't feel this is necessarily bad as long as the two configs are things that change independently. 3. Yeah totally, that is what I was responding to. My concern is just that this method is kind of insane (it returns a good node to make a metadata request to or sometimes null and assumes you will keep trying while calling poll in between). So maybe there is a way to refactor this into a general facility... Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1316: - Attachment: KAFKA-1316_2014-06-07_11:20:38.patch Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020899#comment-14020899 ] Jay Kreps commented on KAFKA-1316: -- Updated reviewboard https://reviews.apache.org/r/21937/ against branch trunk Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020901#comment-14020901 ] Jay Kreps commented on KAFKA-1316: -- I updated this patch to 1. Address a number of comments. 2. Get all the unit tests passing (there were a couple of bugs that caused tests to sporadically hang) I think overall there are two levels of feedback here. The first is to iron out whether the API actually makes sense and is convenient (i.e. is what we are trying to do worth doing) and then figure out any additional stylistic or correctness issues (i.e. have we done it well). The current approach has two layers. The network layer is in org.apache.kafka.clients.network and has the selector and logic for sending and receiving size-delimited byte arrays across a bunch of connections. The new NetworkClient/KafkaClient layer (name could be improved) is basically adding on top of this several concerns: 1. Serialization 2. Cluster metadata management 3. Connection management So let's really put some thought into seeing if we have these layers right and have the right apis. Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1456) Add LZ4 and LZ4C as a compression codec
[ https://issues.apache.org/jira/browse/KAFKA-1456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14017133#comment-14017133 ] Jay Kreps commented on KAFKA-1456: -- Guys, does this add a runtime dependency on this jar for all clients? Add LZ4 and LZ4C as a compression codec --- Key: KAFKA-1456 URL: https://issues.apache.org/jira/browse/KAFKA-1456 Project: Kafka Issue Type: Improvement Reporter: Joe Stein Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1456.patch, KAFKA-1456_2014-05-19_15:01:10.patch, KAFKA-1456_2014-05-19_16:39:01.patch, KAFKA-1456_2014-05-19_18:19:32.patch, KAFKA-1456_2014-05-19_23:24:27.patch -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14017182#comment-14017182 ] Jay Kreps commented on KAFKA-1316: -- Updated reviewboard https://reviews.apache.org/r/21937/ against branch trunk Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1316: - Attachment: KAFKA-1316_2014-06-03_14:33:33.patch Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, KAFKA-1316_2014-06-03_14:33:33.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011735#comment-14011735 ] Jay Kreps commented on KAFKA-1316: -- [~guozhang] I think you are describing the case where the consumer failure detects the co-ordinator due to lack of a hb response, right? Presumably in that we would go about rediscovering the co-ordinator? I don't think we need to reach into the request queue and attempt to selectively reorder or remove items, that is likely to not end well. I think in the case where we failure detect the co-ordinator we will just want to reconnect, right? We can add a manual disconnect(Node) method to force that if that is more convenient... Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1466) Kafka server is hung after throwing Attempt to swap the new high watermark file with the old one failed
[ https://issues.apache.org/jira/browse/KAFKA-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011750#comment-14011750 ] Jay Kreps commented on KAFKA-1466: -- Is it possible you were out of disk space? Here is the sequence of operations that occurs: // swap new offset checkpoint file with previous one if(!temp.renameTo(file)) { // renameTo() fails on Windows if the destination file exists. file.delete() if(!temp.renameTo(file)) throw new IOException(File rename from %s to %s failed..format(temp.getAbsolutePath, file.getAbsolutePath)) } We first try to just rename the new checkpoint to the old checkpoint. On unix this is atomic, but on windows it will fail. If it fails then we manually delete the current checkpoint and repeat the rename. This also failed, which is what threw the exception. Unfortunately java doesn't give a lot of info when it fails so it is a bit hard to debug. Is it possible that the process somehow lost permission to write to the checkpoint file or something like that? Or perhaps this was some kind of transient disk issue. Regardless if this occurs the correct behavior would be for the IOException to be thrown and the server to kill itself. This seems to have happened, but somehow the process didn't die? I think we could try to reproduce this by removing permissions on the offset checkpoint file while the server is running. When this happens the expected behavior is that the server should shut itself down and another replica should be elected as leader. If we can figure out anything that would cause the rename to fail that we aren't handling right then that will be a bug. If we can reproduce the server not cleanly killing itself and fully exiting then that would another bug. Kafka server is hung after throwing Attempt to swap the new high watermark file with the old one failed - Key: KAFKA-1466 URL: https://issues.apache.org/jira/browse/KAFKA-1466 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: Arup Malakar Attachments: kafka.log.1 We have a kafka cluster of four nodes. The cluster was down after one of the nodes threw the following error: 2014-05-21 23:19:44 FATAL [highwatermark-checkpoint-thread1]: HighwaterMarkCheckpoint:109 - Attempt to swap the new high watermark file with the old one failed. I saw the following message in the log file of the failed node: {code} 2014-05-21 23:19:44 FATAL [highwatermark-checkpoint-thread1]: HighwaterMarkCheckpoint:109 - Attempt to swap the new high watermark file with the old one failed 2014-05-21 23:19:44 INFO [Thread-1]: KafkaServer:67 - [Kafka Server 4], Shutting down 2014-05-21 23:19:44 INFO [Thread-1]: KafkaZooKeeper:67 - Closing zookeeper client... 2014-05-21 23:19:44 INFO [ZkClient-EventThread-21-zoo-c2n1.us-east-1.ooyala.com,zoo-c2n2.us-east-1.ooyala.com,zoo-c2n3.us-east-1.ooyala.com,zoo-c2n4.us-east-1.ooyala.com,zoo-c2n5.u s-east-1.ooyala.com]: ZkEventThread:82 - Terminate ZkClient event thread. 2014-05-21 23:19:44 INFO [main-EventThread]: ClientCnxn:521 - EventThread shut down 2014-05-21 23:19:44 INFO [Thread-1]: ZooKeeper:544 - Session: 0x1456b562865b172 closed 2014-05-21 23:19:44 INFO [kafka-processor-9092-0]: Processor:67 - Closing socket connection to /10.245.173.136. 2014-05-21 23:19:44 INFO [Thread-1]: SocketServer:67 - [Socket Server on Broker 4], Shutting down 2014-05-21 23:19:44 INFO [Thread-1]: SocketServer:67 - [Socket Server on Broker 4], Shutdown completed 2014-05-21 23:19:44 INFO [Thread-1]: KafkaRequestHandlerPool:67 - [Kafka Request Handler on Broker 4], shutting down 2014-05-21 23:19:44 INFO [Thread-1]: KafkaRequestHandlerPool:67 - [Kafka Request Handler on Broker 4], shutted down completely 2014-05-21 23:19:44 INFO [Thread-1]: KafkaScheduler:67 - Shutdown Kafka scheduler 2014-05-21 23:19:45 INFO [Thread-1]: ReplicaManager:67 - [Replica Manager on Broker 4]: Shut down 2014-05-21 23:19:45 INFO [Thread-1]: ReplicaFetcherManager:67 - [ReplicaFetcherManager on broker 4] shutting down 2014-05-21 23:19:45 INFO [Thread-1]: ReplicaFetcherThread:67 - [ReplicaFetcherThread-0-3], Shutting down 2014-05-21 23:19:45 INFO [ReplicaFetcherThread-0-3]: ReplicaFetcherThread:67 - [ReplicaFetcherThread-0-3], Stopped 2014-05-21 23:19:45 INFO [Thread-1]: ReplicaFetcherThread:67 - [ReplicaFetcherThread-0-3], Shutdown completed 2014-05-21 23:19:45 INFO [Thread-1]: ReplicaFetcherThread:67 - [ReplicaFetcherThread-0-2], Shutting down 2014-05-21 23:19:45 INFO [ReplicaFetcherThread-0-2]: ReplicaFetcherThread:67 - [ReplicaFetcherThread-0-2], Stopped 2014-05-21 23:19:45 INFO [Thread-1]:
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14010153#comment-14010153 ] Jay Kreps commented on KAFKA-1316: -- Created reviewboard https://reviews.apache.org/r/21937/ against branch trunk Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14010498#comment-14010498 ] Jay Kreps edited comment on KAFKA-1316 at 5/27/14 11:38 PM: For (2) I think there are two solutions 1. Change the server semantics to allow processing multiple requests at the same time and out of order. 2. Use two connections Quick discussion: 1. Allowing out-of-order requests complicates things in the clients a bit as you can no longer reason that the Nth response is for the Nth request you made. It also isn't clear what we would even guarantee on the server side. The two things that we have to do is handle produce requests in order and produce back pressure when two much data is sent. Backpressure means the socket server needs to stop reading requests, but to make that decision it needs to have parsed the request and know it is a produce request... 2. Using two connections might work. It is a bit hacky. The consumer would need to create a Node object for the host-port of the current co-ordinator and then things would work from there on (I think). The node id would need to be some negative number or something. I'm not really sure if there is a clean generalization of this. was (Author: jkreps): For (2) I think there are two solutions 1. Change the server semantics to allow processing multiple requests at the same time and out of order. 2. Use two connections Quick discussion: 1. Allowing out-of-order requests complicates things in the clients a bit as you can no longer reason that the Nth response is for the Nth request you made. It also isn't clear what we would even guarantee on the server side. The two things that we have to do is handle produce requests in order and produce back pressure when two much data is sent. Backpressure means the socket server needs to stop reading requests, but to make that decision it needs to have parsed the request and know it is a produce request... 2. Using two connections might work. It is a bit hacky. The consumer would need to create a Node object for the host-port of the current co-ordinator and then things would work from there on (I think). Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14010498#comment-14010498 ] Jay Kreps commented on KAFKA-1316: -- For (2) I think there are two solutions 1. Change the server semantics to allow processing multiple requests at the same time and out of order. 2. Use two connections Quick discussion: 1. Allowing out-of-order requests complicates things in the clients a bit as you can no longer reason that the Nth response is for the Nth request you made. It also isn't clear what we would even guarantee on the server side. The two things that we have to do is handle produce requests in order and produce back pressure when two much data is sent. Backpressure means the socket server needs to stop reading requests, but to make that decision it needs to have parsed the request and know it is a produce request... 2. Using two connections might work. It is a bit hacky. The consumer would need to create a Node object for the host-port of the current co-ordinator and then things would work from there on (I think). Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1316.patch Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1467) Fixes small documentation typos
[ https://issues.apache.org/jira/browse/KAFKA-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1467. -- Resolution: Fixed Thanks for the patch! Committed. Fixes small documentation typos --- Key: KAFKA-1467 URL: https://issues.apache.org/jira/browse/KAFKA-1467 Project: Kafka Issue Type: Bug Components: website Affects Versions: 0.8.1 Reporter: Brian Chhun Priority: Trivial Labels: documentation Attachments: KAFKA-1467-consistent-zookeeper-letter-cases.diff, KAFKA-1467-log-compaction-typo.diff Fixes a minor typo in the log compaction section of the documentation. Also includes second a patch to changes various letter cases used for ZooKeeper (e.g., zookeeper and Zookeeper) into just ZooKeeper. There's nothing to ensure that newer documentation will adhere to the letter case convention, however. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1468) Improve perf tests
Jay Kreps created KAFKA-1468: Summary: Improve perf tests Key: KAFKA-1468 URL: https://issues.apache.org/jira/browse/KAFKA-1468 Project: Kafka Issue Type: Bug Reporter: Jay Kreps This is issue is a placeholder for a bunch of improvements that came out of a round of benchmarking. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1468) Improve perf tests
[ https://issues.apache.org/jira/browse/KAFKA-1468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1468: - Attachment: KAFKA-1468.patch Improve perf tests -- Key: KAFKA-1468 URL: https://issues.apache.org/jira/browse/KAFKA-1468 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Attachments: KAFKA-1468.patch This is issue is a placeholder for a bunch of improvements that came out of a round of benchmarking. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1468) Improve perf tests
[ https://issues.apache.org/jira/browse/KAFKA-1468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14007705#comment-14007705 ] Jay Kreps commented on KAFKA-1468: -- Created reviewboard https://reviews.apache.org/r/21878/ against branch trunk Improve perf tests -- Key: KAFKA-1468 URL: https://issues.apache.org/jira/browse/KAFKA-1468 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Attachments: KAFKA-1468.patch This is issue is a placeholder for a bunch of improvements that came out of a round of benchmarking. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14005249#comment-14005249 ] Jay Kreps commented on KAFKA-1316: -- Neha, yeah I'll post something as I get it in shape. Jun, ready is non-blocking. This is important for the producer which needs to intersect the set of partitions hosted by nodes whom we are ready to send to with the set of nodes for whom we have data. But for the consumer we should think this through. Implementing a blocking connect would be something like {noformat} while(!client.ready(node, System.currentTimeMillis())) client.poll(Collections.emptyList(), 100) {noformat} which is not terribly intuitive. We could add an api for this if it would help. But let's think through how we would implement the consumer state machine and then I think we will know what we need. Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14004009#comment-14004009 ] Jay Kreps commented on KAFKA-1316: -- I started on this and I think I have a design that at least works for the producer, let's think if it would also work for the consumer. The plan is that Sender remains, but most of the logic is moved into a new class Client which will be shared by the producer, consumer, and any additional clients (admin, for example). The Client class will incapsulate the connection state and the management of metadata. The client will exposes two methods: {noformat} class Client { /* Initiate a connection to the given node (if one doesn't already exist). Return true if we already have a ready connection. */ boolean ready(Node node, long now); /* Send new requests and return any completed requests */ ListClientResponse poll(ListClientRequest requests, long ms); } {noformat} The poll request takes a list of requests for ready connections and attempts to send them. It returns any completed requests. The responses returned will not generally be for the requests being sent but for previous requests. ClientRequest is just a renaming and generalization of InFlightRequest. ClientResponse is a new class that will reference the original ClientRequest as well as maintain the response information received (which we currently handle inline in Sender). The user of this class (e.g. Sender) has to use the ready() method to ensure it only initiates requests to ready connections. What needs to be thought through is whether these APIs suffice for the consumer. Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1441) Purgatory purge causes latency spikes
Jay Kreps created KAFKA-1441: Summary: Purgatory purge causes latency spikes Key: KAFKA-1441 URL: https://issues.apache.org/jira/browse/KAFKA-1441 Project: Kafka Issue Type: Bug Reporter: Jay Kreps The request purgatory has a funky thing where it periodically loops over all watches and purges them. If you have a fair number of partitions you can accumulate lots of watches and purging them can take a long time. During this time all expiry is halted. Here is an example log: [2014-05-08 21:07:41,950] INFO ExpiredRequestReaper-2 Expired request after 10ms: 5829 (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:41,952] INFO ExpiredRequestReaper-2 Expired request after 10ms: 5882 (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:41,967] INFO ExpiredRequestReaper-2 Expired request after 11ms: 5884 (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:41,968] INFO ExpiredRequestReaper-2 Purging purgatory (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:41,969] INFO ExpiredRequestReaper-2 Purged 0 requests from delay queue. (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:42,305] INFO ExpiredRequestReaper-2 Purged 340809 (watcher) requests. (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:42,305] INFO ExpiredRequestReaper-2 Expired request after 106ms: 5847 (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:42,305] INFO ExpiredRequestReaper-2 Expired request after 106ms: 5904 (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:42,328] INFO ExpiredRequestReaper-2 Expired request after 10ms: 5908 (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:42,329] INFO ExpiredRequestReaper-2 Expired request after 10ms: 5852 (kafka.server.RequestPurgatory$ExpiredRequestReaper) [2014-05-08 21:07:42,343] INFO ExpiredRequestReaper-2 Expired request after 11ms: 5854 (kafka.server.RequestPurgatory$ExpiredRequestReaper) Combined with our buggy purgatory request impls that can sometimes hit their expiration this can lead to huge latency spikes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready
[ https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1445. -- Resolution: Fixed Applied. New Producer should send all partitions that have non-empty batches when on of them is ready Key: KAFKA-1445 URL: https://issues.apache.org/jira/browse/KAFKA-1445 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1445.patch, KAFKA-1445.patch, KAFKA-1445_2014-05-13_11:25:13.patch, KAFKA-1445_2014-05-14_16:24:25.patch, KAFKA-1445_2014-05-14_16:28:06.patch, KAFKA-1445_2014-05-15_15:15:37.patch, KAFKA-1445_2014-05-15_15:19:10.patch One difference between the new producer and the old producer is that on the new producer the linger time is per partition, instead of global. Therefore, when the traffic is low, the sender will likely expire partitions one-by-one and send lots of small request containing only a few partitions with a few data, resulting largely increased request rate. One solution of it would be to let senders select all partitions that have non-empty batches when on of them is ready. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready
[ https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13999328#comment-13999328 ] Jay Kreps commented on KAFKA-1445: -- Looks good to me, if no comments from others I will apply this. New Producer should send all partitions that have non-empty batches when on of them is ready Key: KAFKA-1445 URL: https://issues.apache.org/jira/browse/KAFKA-1445 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1445.patch, KAFKA-1445.patch, KAFKA-1445_2014-05-13_11:25:13.patch, KAFKA-1445_2014-05-14_16:24:25.patch, KAFKA-1445_2014-05-14_16:28:06.patch, KAFKA-1445_2014-05-15_15:15:37.patch, KAFKA-1445_2014-05-15_15:19:10.patch One difference between the new producer and the old producer is that on the new producer the linger time is per partition, instead of global. Therefore, when the traffic is low, the sender will likely expire partitions one-by-one and send lots of small request containing only a few partitions with a few data, resulting largely increased request rate. One solution of it would be to let senders select all partitions that have non-empty batches when on of them is ready. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1430) Purgatory redesign
[ https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13990819#comment-13990819 ] Jay Kreps commented on KAFKA-1430: -- The way we garbage collect obsolete entries was fundamentally a bit sketchy due to keeping both an LRU list and hash. Purgatory redesign -- Key: KAFKA-1430 URL: https://issues.apache.org/jira/browse/KAFKA-1430 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao We have seen 2 main issues with the Purgatory. 1. There is no atomic checkAndWatch functionality. So, a client typically first checks whether a request is satisfied or not and then register the watcher. However, by the time the watcher is registered, the registered item could already be satisfied. This item won't be satisfied until the next update happens or the delayed time expires, which means the watched item could be delayed. 2. FetchRequestPurgatory doesn't quite work. This is because the current design tries to incrementally maintain the accumulated bytes ready for fetch. However, this is difficult since the right time to check whether a fetch (for regular consumer) request is satisfied is when the high watermark moves. At that point, it's hard to figure out how many bytes we should incrementally add to each pending fetch request. The problem has been reported in KAFKA-1150 and KAFKA-703. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1430) Purgatory redesign
[ https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13990829#comment-13990829 ] Jay Kreps commented on KAFKA-1430: -- A couple of random thoughts. - Not sure if I have fully groked the implications of this. - In general I feel we are not building on a firm foundation with the purgatory code and we should somehow find a way to refactor the apis to make code that uses it more readable. I think in the absence of this it is impossible to write correct code because it is too hard to understand the flow of things. This is somewhat a separate thing from this proposal, and could be a separate ticket or effort. - Do we actually need to do anything for requests not in the last segment? My concern is that trying to maintain the full set of physical sizes is going to be very error prone and will impact the recovery logic too. One thing about these apis is that it is always acceptable to return early. So what if we maintain physical information about the last segment which we maintain through the append api and for all previous segments we just return immediately (after all you will never wait on older segments). Purgatory redesign -- Key: KAFKA-1430 URL: https://issues.apache.org/jira/browse/KAFKA-1430 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao We have seen 2 main issues with the Purgatory. 1. There is no atomic checkAndWatch functionality. So, a client typically first checks whether a request is satisfied or not and then register the watcher. However, by the time the watcher is registered, the registered item could already be satisfied. This item won't be satisfied until the next update happens or the delayed time expires, which means the watched item could be delayed. 2. FetchRequestPurgatory doesn't quite work. This is because the current design tries to incrementally maintain the accumulated bytes ready for fetch. However, this is difficult since the right time to check whether a fetch (for regular consumer) request is satisfied is when the high watermark moves. At that point, it's hard to figure out how many bytes we should incrementally add to each pending fetch request. The problem has been reported in KAFKA-1150 and KAFKA-703. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1316) Refactor Sender
[ https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13991361#comment-13991361 ] Jay Kreps commented on KAFKA-1316: -- Okay so here is my proposal. I want to refactor out a couple of helper objects from Sender, specifically NodeState, InFlightRequest and InFlightRequests. This will be packaged up inside a class that does this stuff and wraps the Selector. It will implement a partition aware request/response layer on top of the more general selector API. Not sure what to call this, maybe something like ClusterSelector or RequestSelector or something like that. This class will model the state machine we current go through where a request is directed to a particular node and connections are set up in a non-blocking way before a request can be made. It will also handle the request/response correlation. I think I may need to just take a stab at it to see exactly how this will work so I am just going to dive in. Refactor Sender --- Key: KAFKA-1316 URL: https://issues.apache.org/jira/browse/KAFKA-1316 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Jay Kreps Assignee: Jay Kreps Currently most of the logic of the producer I/O thread is in Sender.java. However we will need to do a fair number of similar things in the new consumer. Specifically: - Track in-flight requests - Fetch metadata - Manage connection lifecycle It may be possible to refactor some of this into a helper class that can be shared with the consumer. This will require some detailed thought. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1428) FIS not closed after Properties.load
[ https://issues.apache.org/jira/browse/KAFKA-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1428. -- Resolution: Fixed Applied. Nice catch. FIS not closed after Properties.load Key: KAFKA-1428 URL: https://issues.apache.org/jira/browse/KAFKA-1428 Project: Kafka Issue Type: Bug Reporter: Jon Bringhurst Priority: Minor Attachments: KAFKA-1428.patch A FileInputStream is not being closed after using it for a Properties.load. -- This message was sent by Atlassian JIRA (v6.2#6252)