[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: (was: KAFKA-1414-rev2.fixed.patch) Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev2.fixed.patch Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: freebie.patch I would also like to throw in that little [patch|^freebie.patch]. It (arguably) improves and (definitely) simplifies the use of {{Utils.runnable}} helper, which used in my patch and a couple of other places. By changing it's signature we would be able to use it as: {code} Utils.runnable { // do something } {code} instead of: {code} Utils.runnable { () = { // do something }} {code} which will clarify the syntax a bit making it more compelling to use. The patch can be applied directly on top of the primary patch for the task. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1510) Force offset commits at a minimum interval when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066294#comment-14066294 ] Nicolae Marasoiu commented on KAFKA-1510: - Hi, it sounds clear and simple enough, I am going to try this. I will probably come back with some questions for the low level detail. Why is the offset management moving from zookeeper to kafka? To ease the consumer and favor language proliferation of consumers ? Is kafka managing them through zookeeper as well, behind the scenes, or is it using its own / other cluster / consensus mechanism to store the offsets in a HA manner? Force offset commits at a minimum interval when migrating consumer offsets from zookeeper to kafka -- Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Labels: newbie Fix For: 0.8.2 When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown. Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066307#comment-14066307 ] Nicolae Marasoiu commented on KAFKA-1535: - awesome, I see you applied the patch and commited into trunk:) I see there are also pull requests: I think that for the bigger features, to enable a granular and pointed discussions, only than pull requests are practiced? return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jay Kreps Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector
[ https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066315#comment-14066315 ] Nicolae Marasoiu commented on KAFKA-1282: - Right, the limitation is more critical on the client side of a client-server connection due to port count limitation, and/or socket/file count restrictions of the client env. On the other hand, the brokers could close the connections too on such condition, rather than relying on the clients(producers) to protect it. However, what is any other reason to reduce the socket connections count? To make the NIO select lighter on the server, on a lesser number of connections? I think epoll is quite relaxed on this. I would like to work on this, but also understand the original problem(s) / concern(s) to see if we can also see any more suitable solutions to the particular concern? Disconnect idle socket connection in Selector - Key: KAFKA-1282 URL: https://issues.apache.org/jira/browse/KAFKA-1282 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie++ Fix For: 0.9.0 To reduce # socket connections, it would be useful for the new producer to close socket connections that are idle. We can introduce a new producer config for the idle time. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1509) Restart of destination broker after unreplicated partition move leaves partitions without leader
[ https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066319#comment-14066319 ] Nicolae Marasoiu commented on KAFKA-1509: - Is a fix still needed for this, do you know? Restart of destination broker after unreplicated partition move leaves partitions without leader Key: KAFKA-1509 URL: https://issues.apache.org/jira/browse/KAFKA-1509 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Albert Strasheim Labels: newbie++ Attachments: controller2.log This should be reasonably easy to reproduce. Make a Kafka cluster with a few machines. Create a topic with partitions on these machines. No replication. Bring up one more Kafka node. Move some or all of the partitions onto this new broker: kafka-reassign-partitions.sh --generate --zookeeper zk:2181 --topics-to-move-json-file move.json --broker-list new broker kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 --reassignment-json-file reassign.json --execute Wait until broker is the leader for all the partitions you moved. Send some data to the partitions. It all works. Shut down the broker that just received the data. Start it back up. {code} Topic:testPartitionCount:2ReplicationFactor:1 Configs: Topic: test Partition: 0Leader: -1 Replicas: 7 Isr: Topic: test Partition: 1Leader: -1 Replicas: 7 Isr: {code} Leader for topic test never gets elected even though this node is the only node that knows about the topic. Some logs: {code} Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: Initialized controller epoch to 53 and zk version 52 (kafka.controller.ControllerEpochListener) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup complete (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3) for partition [requests,0] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1) for partition [requests,13] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5) for partition [requests_ipv6,5] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:13,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:4,5) for partition [requests_stored,7] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info
Re: [DISCUSS] Kafka Security Specific Features
Hello Raja/Joe, When I turn on security, i still get out of memory error on producer. Is this something to do with keys? Is there any other way I can connect to broker? *producer log* [2014-07-17 15:38:14,186] ERROR OOME with size 352518400 (kafka.network. BoundedByteBufferReceive) java.lang.OutOfMemoryError: Java heap space *broker log* INFO begin ssl handshake for localhost/127.0.0.1:50199//127.0.0.1:9092 On Thu, Jul 17, 2014 at 6:07 PM, Pramod Deshmukh dpram...@gmail.com wrote: Correct, I don't see any exceptions when i turn off security. Consumer is able to consume the message. I still see warning for topic property. [2014-07-17 18:04:38,360] WARN Property topic is not valid (kafka.utils.VerifiableProperties) On Thu, Jul 17, 2014 at 5:49 PM, Rajasekar Elango rela...@salesforce.com wrote: Can you try with turning off security to check if this error happens only on secure mode? Thanks, Raja. On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh dpram...@gmail.com wrote: Thanks Raja, it was helpful Now I am able to start zookeeper and broker in secure mode ready for SSL handshake. I get *java.lang.OutOfMemoryError: Java heap space* on producer. I using the default configuration and keystore. Is there anything missing *Start broker:* *bin/kafka-server-start.sh config/server.properties* *broker.log:* [2014-07-17 15:34:46,281] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0' (kafka.log.LogManager) [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in log secure.test-0. (kafka.log.Log) [2014-07-17 15:34:46,571] INFO Completed load of log secure.test-0 with log end offset 0 (kafka.log.Log) [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period of 6 ms. (kafka.log.LogManager) [2014-07-17 15:34:46,587] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2014-07-17 15:34:46,614] INFO Initializing secure authentication (kafka.network.security.SecureAuth$) [2014-07-17 15:34:46,678] INFO Secure authentication initialization has been successfully completed (kafka.network.security.SecureAuth$) [2014-07-17 15:34:46,691] INFO Awaiting socket connections on 0.0.0.0:9092 . (kafka.network.Acceptor) [2014-07-17 15:34:46,692] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer) [2014-07-17 15:34:46,794] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-07-17 15:34:46,837] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-07-17 15:34:47,057] INFO Registered broker 0 at path /brokers/ids/0 with address 10.1.100.130:9092. (kafka.utils.ZkUtils$) [2014-07-17 15:34:47,059] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) *[2014-07-17 15:34:47,068] INFO [Kafka Server 0], started (kafka.server.KafkaServer)* *[2014-07-17 15:34:47,383] INFO begin ssl handshake for /10.1.100.130:9092//10.1.100.130:51685 http://10.1.100.130:9092//10.1.100.130:51685 (kafka.network.security.SSLSocketChannel)* *[2014-07-17 15:34:47,392] INFO begin ssl handshake for 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 (kafka.network.security.SSLSocketChannel)* *[2014-07-17 15:34:47,465] INFO finished ssl handshake for 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 (kafka.network.security.SSLSocketChannel)* *[2014-07-17 15:34:47,465] INFO finished ssl handshake for /10.1.100.130:9092//10.1.100.130:51685 http://10.1.100.130:9092//10.1.100.130:51685 (kafka.network.security.SSLSocketChannel)* *[2014-07-17 15:34:47,617] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions (kafka.server.ReplicaFetcherManager)* *[2014-07-17 15:34:47,627] INFO [ReplicaFetcherManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaFetcherManager)* *[2014-07-17 15:34:47,656] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [secure.test,0] (kafka.server.ReplicaFetcherManager)* [2014-07-17 15:37:15,970] INFO begin ssl handshake for 10.1.100.130/10.1.100.130:51689//10.1.100.130:9092 (kafka.network.security.SSLSocketChannel) [2014-07-17 15:37:16,075] INFO begin ssl handshake for 10.1.100.130/10.1.100.130:51690//10.1.100.130:9092 (kafka.network.security.SSLSocketChannel) [2014-07-17 15:37:16,434] INFO begin ssl handshake for 10.1.100.130/10.1.100.130:51691//10.1.100.130:9092 (kafka.network.security.SSLSocketChannel) [2014-07-17 15:37:16,530] INFO begin ssl handshake for 10.1.100.130/10.1.100.130:51692//10.1.100.130:9092 (kafka.network.security.SSLSocketChannel) [2014-07-17
[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API
[ https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066423#comment-14066423 ] BalajiSeshadri commented on KAFKA-328: -- Sure will do. Can you guys please verify my patch for KAFKA-1476. Balaji Write unit test for kafka server startup and shutdown API -- Key: KAFKA-328 URL: https://issues.apache.org/jira/browse/KAFKA-328 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: BalajiSeshadri Labels: newbie Background discussion in KAFKA-320 People often try to embed KafkaServer in an application that ends up calling startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this works correctly we have to be very careful about cleaning up resources. This is a good practice for making unit tests reliable anyway. A good first step would be to add some unit tests on startup and shutdown to cover various cases: 1. A Kafka server can startup if it is not already starting up, if it is not currently being shutdown, or if it hasn't been already started 2. A Kafka server can shutdown if it is not already shutting down, if it is not currently starting up, or if it hasn't been already shutdown. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1536) Change the status of the JIRA to Patch Available in the kafka-review-tool
[ https://issues.apache.org/jira/browse/KAFKA-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1536: --- Attachment: (was: KAFKA-1536.patch) Change the status of the JIRA to Patch Available in the kafka-review-tool --- Key: KAFKA-1536 URL: https://issues.apache.org/jira/browse/KAFKA-1536 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Neha Narkhede Fix For: 0.9.0 Attachments: KAFKA-1536.patch, KAFKA-1536.patch, KAFKA-1536_2014-07-18_20:36:29.patch When using the kafka-review-tool to upload a patch to certain jira, the status remains OPEN. It makes searching for JIRAs that needs review a bit hard. Would be better to make the tool also change the status of the jira. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1536) Change the status of the JIRA to Patch Available in the kafka-review-tool
[ https://issues.apache.org/jira/browse/KAFKA-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066424#comment-14066424 ] Manikumar Reddy commented on KAFKA-1536: Updated reviewboard https://reviews.apache.org/r/23440/diff/ against branch origin/trunk Change the status of the JIRA to Patch Available in the kafka-review-tool --- Key: KAFKA-1536 URL: https://issues.apache.org/jira/browse/KAFKA-1536 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Neha Narkhede Fix For: 0.9.0 Attachments: KAFKA-1536.patch, KAFKA-1536.patch, KAFKA-1536_2014-07-18_20:36:29.patch When using the kafka-review-tool to upload a patch to certain jira, the status remains OPEN. It makes searching for JIRAs that needs review a bit hard. Would be better to make the tool also change the status of the jira. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1536) Change the status of the JIRA to Patch Available in the kafka-review-tool
[ https://issues.apache.org/jira/browse/KAFKA-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1536: --- Attachment: KAFKA-1536_2014-07-18_20:41:40.patch Change the status of the JIRA to Patch Available in the kafka-review-tool --- Key: KAFKA-1536 URL: https://issues.apache.org/jira/browse/KAFKA-1536 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Manikumar Reddy Fix For: 0.9.0 Attachments: KAFKA-1536.patch, KAFKA-1536.patch, KAFKA-1536_2014-07-18_20:41:40.patch When using the kafka-review-tool to upload a patch to certain jira, the status remains OPEN. It makes searching for JIRAs that needs review a bit hard. Would be better to make the tool also change the status of the jira. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23440: addressing reviewer comments
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23440/ --- (Updated July 18, 2014, 3:13 p.m.) Review request for kafka. Summary (updated) - addressing reviewer comments Bugs: KAFKA-1536 https://issues.apache.org/jira/browse/KAFKA-1536 Repository: kafka Description --- JIRA status set to Patch Available in kafka-patch-review script Diffs (updated) - kafka-patch-review.py dc45549f886440f1721c60aab9aa0a4af9b4cbef Diff: https://reviews.apache.org/r/23440/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Updated] (KAFKA-1536) Change the status of the JIRA to Patch Available in the kafka-review-tool
[ https://issues.apache.org/jira/browse/KAFKA-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1536: --- Assignee: Manikumar Reddy (was: Neha Narkhede) Status: Open (was: Patch Available) for testing this patch Change the status of the JIRA to Patch Available in the kafka-review-tool --- Key: KAFKA-1536 URL: https://issues.apache.org/jira/browse/KAFKA-1536 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Manikumar Reddy Fix For: 0.9.0 Attachments: KAFKA-1536.patch, KAFKA-1536.patch, KAFKA-1536_2014-07-18_20:41:40.patch When using the kafka-review-tool to upload a patch to certain jira, the status remains OPEN. It makes searching for JIRAs that needs review a bit hard. Would be better to make the tool also change the status of the jira. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1536) Change the status of the JIRA to Patch Available in the kafka-review-tool
[ https://issues.apache.org/jira/browse/KAFKA-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1536: --- Status: Patch Available (was: Open) Change the status of the JIRA to Patch Available in the kafka-review-tool --- Key: KAFKA-1536 URL: https://issues.apache.org/jira/browse/KAFKA-1536 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Manikumar Reddy Fix For: 0.9.0 Attachments: KAFKA-1536.patch, KAFKA-1536.patch, KAFKA-1536_2014-07-18_20:41:40.patch When using the kafka-review-tool to upload a patch to certain jira, the status remains OPEN. It makes searching for JIRAs that needs review a bit hard. Would be better to make the tool also change the status of the jira. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1536) Change the status of the JIRA to Patch Available in the kafka-review-tool
[ https://issues.apache.org/jira/browse/KAFKA-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1536: --- Attachment: (was: KAFKA-1536_2014-07-18_20:36:29.patch) Change the status of the JIRA to Patch Available in the kafka-review-tool --- Key: KAFKA-1536 URL: https://issues.apache.org/jira/browse/KAFKA-1536 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Manikumar Reddy Fix For: 0.9.0 Attachments: KAFKA-1536.patch, KAFKA-1536.patch, KAFKA-1536_2014-07-18_20:41:40.patch When using the kafka-review-tool to upload a patch to certain jira, the status remains OPEN. It makes searching for JIRAs that needs review a bit hard. Would be better to make the tool also change the status of the jira. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: KAFKA-1150
Hi Simon, Just commented on the ticket. Guozhang On Fri, Jul 18, 2014 at 4:56 AM, Simon Cooper simon.coo...@featurespace.co.uk wrote: There's been no recent movement on KAFKA-1150. We’re significantly affected by this bug, trying to achieve small-millisecond transfer latencies on a replicated system with quite bursty message frequencies. What are the chances of this bug being fixed in 0.8.2? Could it be marked for fixing in 0.8.2? Is there any additional information that I can provide? Unfortunately, I currently cannot submit patches for this bug due to company policy on OSS... Thanks, SimonC -- -- Guozhang
[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible
[ https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066435#comment-14066435 ] Guozhang Wang commented on KAFKA-1150: -- Simon, I am currently working on KAFKA-1430, which hopefully will fixes both problems reported here. You can probably take a look at the ticket for now and let me know if you think anything is still missing. Fetch on a replicated topic does not return as soon as possible --- Key: KAFKA-1150 URL: https://issues.apache.org/jira/browse/KAFKA-1150 Project: Kafka Issue Type: Bug Components: core, replication Affects Versions: 0.8.0 Reporter: Andrey Balmin Assignee: Neha Narkhede Attachments: Test.java I see a huge performance difference between replicated and not replicated topics. On my laptop, running two brokers, I see producer-2-consumer latency of under 1ms for topics with one replica. However, with two replicas the same latency equals to the max fetch delay. Here is a simple test I just did: one producer thread in a loop sending one message and sleeping for 2500ms, and one consumer thread looping on the long poll with max fetch delay of 1000 ms. Here is what happens with no replication: Produced 1 key: key1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:53.823 Consumed up to 1 at time: 15:33:54.825 Produced 2 key: key2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:56.326 Consumed up to 2 at time: 15:33:57.328 Produced 3 key: key3 at time: 15:33:57.827 Consumed up to 3 at time: 15:33:57.827 The are no delays between the message being produced and consumed -- this is the behavior I expected. Here is the same test, but for a topic with two replicas: Consumed up to 0 at time: 15:50:29.575 Produced 1 key: key1 at time: 15:50:29.575 Consumed up to 1 at time: 15:50:30.577 Consumed up to 1 at time: 15:50:31.579 Consumed up to 1 at time: 15:50:32.078 Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 Consumed up to 2 at time: 15:50:34.081 Consumed up to 2 at time: 15:50:34.581 Produced 3 key: key3 at time: 15:50:34.581 Consumed up to 3 at time: 15:50:35.584 Notice how the fetch always returns as soon as the produce request is issued, but without the new message, which consistently arrives ~1002 ms later. Below is the request log snippet for this part: Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 You can see the first FetchRequest returns at the same time as the replica FetchRequest, but this fetch response is *empty* -- the message is not committed yet, so it cannot be returned. The message is committed at 15:50:32,079. However, the next FetchRequest (that does return the message) comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it waiting for the full 1000 ms, instead of returning right away? [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) from client /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1048576) from client /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 from client /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1;
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066439#comment-14066439 ] Jay Kreps commented on KAFKA-1535: -- To date we have not really used pull requests. Apache has something about JIRA being a way to manage copyright assignment. I think recently they may have started supporting pull requests, so we should probably document and better understand that workflow. But at the moment I think we are still just doing patches and JIRA like it is the last century. :-) return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jay Kreps Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible
[ https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066440#comment-14066440 ] Jay Kreps commented on KAFKA-1150: -- Yeah, [~thecoop1984], can you verify that the patch on that ticket actually fixes the problem you saw? Fetch on a replicated topic does not return as soon as possible --- Key: KAFKA-1150 URL: https://issues.apache.org/jira/browse/KAFKA-1150 Project: Kafka Issue Type: Bug Components: core, replication Affects Versions: 0.8.0 Reporter: Andrey Balmin Assignee: Neha Narkhede Attachments: Test.java I see a huge performance difference between replicated and not replicated topics. On my laptop, running two brokers, I see producer-2-consumer latency of under 1ms for topics with one replica. However, with two replicas the same latency equals to the max fetch delay. Here is a simple test I just did: one producer thread in a loop sending one message and sleeping for 2500ms, and one consumer thread looping on the long poll with max fetch delay of 1000 ms. Here is what happens with no replication: Produced 1 key: key1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:53.823 Consumed up to 1 at time: 15:33:54.825 Produced 2 key: key2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:56.326 Consumed up to 2 at time: 15:33:57.328 Produced 3 key: key3 at time: 15:33:57.827 Consumed up to 3 at time: 15:33:57.827 The are no delays between the message being produced and consumed -- this is the behavior I expected. Here is the same test, but for a topic with two replicas: Consumed up to 0 at time: 15:50:29.575 Produced 1 key: key1 at time: 15:50:29.575 Consumed up to 1 at time: 15:50:30.577 Consumed up to 1 at time: 15:50:31.579 Consumed up to 1 at time: 15:50:32.078 Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 Consumed up to 2 at time: 15:50:34.081 Consumed up to 2 at time: 15:50:34.581 Produced 3 key: key3 at time: 15:50:34.581 Consumed up to 3 at time: 15:50:35.584 Notice how the fetch always returns as soon as the produce request is issued, but without the new message, which consistently arrives ~1002 ms later. Below is the request log snippet for this part: Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 You can see the first FetchRequest returns at the same time as the replica FetchRequest, but this fetch response is *empty* -- the message is not committed yet, so it cannot be returned. The message is committed at 15:50:32,079. However, the next FetchRequest (that does return the message) comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it waiting for the full 1000 ms, instead of returning right away? [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) from client /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1048576) from client /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 from client /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(130,1048576)
[jira] [Commented] (KAFKA-1451) Broker stuck due to leader election race
[ https://issues.apache.org/jira/browse/KAFKA-1451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066456#comment-14066456 ] Neha Narkhede commented on KAFKA-1451: -- Just checking the existence is not enough since there is a risk of not electing a controller at all if all brokers do the same and the node disappears. Following will work 1. Register watch 2. Check existence and elect if one does not exist #1 ensures that if the node disappears, an election will take place Broker stuck due to leader election race - Key: KAFKA-1451 URL: https://issues.apache.org/jira/browse/KAFKA-1451 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Maciek Makowski Priority: Minor Labels: newbie h3. Symptoms The broker does not become available due to being stuck in an infinite loop while electing leader. This can be recognised by the following line being repeatedly written to server.log: {code} [2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node [{version:1,brokerid:1,timestamp:1400060079108}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) {code} h3. Steps to Reproduce In a single kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely behave the same with the ZK version included in Kafka distribution) node setup: # start both zookeeper and kafka (in any order) # stop zookeeper # stop kafka # start kafka # start zookeeper h3. Likely Cause {{ZookeeperLeaderElector}} subscribes to data changes on startup, and then triggers an election. if the deletion of ephemeral {{/controller}} node associated with previous zookeeper session of the broker happens after subscription to changes in new session, election will be invoked twice, once from {{startup}} and once from {{handleDataDeleted}}: * {{startup}}: acquire {{controllerLock}} * {{startup}}: subscribe to data changes * zookeeper: delete {{/controller}} since the session that created it timed out * {{handleDataDeleted}}: {{/controller}} was deleted * {{handleDataDeleted}}: wait on {{controllerLock}} * {{startup}}: elect -- writes {{/controller}} * {{startup}}: release {{controllerLock}} * {{handleDataDeleted}}: acquire {{controllerLock}} * {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then gets into infinite loop as a result of conflict {{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing znode was written from different session, which is not true in this case; it was written from the same session. That adds to the confusion. h3. Suggested Fix In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe to data changes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector
[ https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066461#comment-14066461 ] Jay Kreps commented on KAFKA-1282: -- The goal is just to reduce server connection count. In our environment there might be a single Kafka producer in each process we run publishing to a small Kafka cluster (say ~20 servers). However there are tens of thousands of client processes. Connections can end up going unused when leadership migrates and we should eventually close these out rather than retaining them indefinitely. As you say it is not critical as the server seems to do a good job of dealing with high connection counts, but it seems like a good thing to do. I agree that doing this on the server might be better. This does mean it is possible that the server will attempt to close the socket while the client is attempting to send something. But if the timeout is 10 mins, it is unlikely that this will happen often (i.e. if nothing was sent in the last 10 mins, it will not likely happen in the 0.5 ms it takes to do the close). The advantage of doing it on the server is that it will work for all clients. This change would be in core/.../kafka/network/SocketServer.scala. The only gotcha is that we likely need to avoid iterating over all connections to avoid latency impact (there could be 100k connections). One way to do this would be to use java.util.LinkedHashMap to implement an LRU hash map of the SelectionKeys, and access this every time the selection key comes up in a select operation. (There are a ton of details in LinkedHashMap--needs to be access order, etc). Then every 5-10 select loop iterations we would iterate the map expiring connections until we come to a connection that doesn't need expiring, then stop. Disconnect idle socket connection in Selector - Key: KAFKA-1282 URL: https://issues.apache.org/jira/browse/KAFKA-1282 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie++ Fix For: 0.9.0 To reduce # socket connections, it would be useful for the new producer to close socket connections that are idle. We can introduce a new producer config for the idle time. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: [DISCUSS] Kafka Security Specific Features
Hi Pramod, Can you increase KAFKA_HEAP_OPTS to lets say -Xmx1G in the kafka-console-producer.sh to see if that gets you further along please in your testing? Thanks! /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 10:24 AM, Pramod Deshmukh dpram...@gmail.com wrote: Hello Raja/Joe, When I turn on security, i still get out of memory error on producer. Is this something to do with keys? Is there any other way I can connect to broker? *producer log* [2014-07-17 15:38:14,186] ERROR OOME with size 352518400 (kafka.network. BoundedByteBufferReceive) java.lang.OutOfMemoryError: Java heap space *broker log* INFO begin ssl handshake for localhost/127.0.0.1:50199//127.0.0.1:9092 On Thu, Jul 17, 2014 at 6:07 PM, Pramod Deshmukh dpram...@gmail.com wrote: Correct, I don't see any exceptions when i turn off security. Consumer is able to consume the message. I still see warning for topic property. [2014-07-17 18:04:38,360] WARN Property topic is not valid (kafka.utils.VerifiableProperties) On Thu, Jul 17, 2014 at 5:49 PM, Rajasekar Elango rela...@salesforce.com wrote: Can you try with turning off security to check if this error happens only on secure mode? Thanks, Raja. On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh dpram...@gmail.com wrote: Thanks Raja, it was helpful Now I am able to start zookeeper and broker in secure mode ready for SSL handshake. I get *java.lang.OutOfMemoryError: Java heap space* on producer. I using the default configuration and keystore. Is there anything missing *Start broker:* *bin/kafka-server-start.sh config/server.properties* *broker.log:* [2014-07-17 15:34:46,281] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0' (kafka.log.LogManager) [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in log secure.test-0. (kafka.log.Log) [2014-07-17 15:34:46,571] INFO Completed load of log secure.test-0 with log end offset 0 (kafka.log.Log) [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period of 6 ms. (kafka.log.LogManager) [2014-07-17 15:34:46,587] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2014-07-17 15:34:46,614] INFO Initializing secure authentication (kafka.network.security.SecureAuth$) [2014-07-17 15:34:46,678] INFO Secure authentication initialization has been successfully completed (kafka.network.security.SecureAuth$) [2014-07-17 15:34:46,691] INFO Awaiting socket connections on 0.0.0.0:9092 . (kafka.network.Acceptor) [2014-07-17 15:34:46,692] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer) [2014-07-17 15:34:46,794] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-07-17 15:34:46,837] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-07-17 15:34:47,057] INFO Registered broker 0 at path /brokers/ids/0 with address 10.1.100.130:9092. (kafka.utils.ZkUtils$) [2014-07-17 15:34:47,059] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) *[2014-07-17 15:34:47,068] INFO [Kafka Server 0], started (kafka.server.KafkaServer)* *[2014-07-17 15:34:47,383] INFO begin ssl handshake for /10.1.100.130:9092//10.1.100.130:51685 http://10.1.100.130:9092//10.1.100.130:51685 (kafka.network.security.SSLSocketChannel)* *[2014-07-17 15:34:47,392] INFO begin ssl handshake for 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 (kafka.network.security.SSLSocketChannel)* *[2014-07-17 15:34:47,465] INFO finished ssl handshake for 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 (kafka.network.security.SSLSocketChannel)* *[2014-07-17 15:34:47,465] INFO finished ssl handshake for /10.1.100.130:9092//10.1.100.130:51685 http://10.1.100.130:9092//10.1.100.130:51685 (kafka.network.security.SSLSocketChannel)* *[2014-07-17 15:34:47,617] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions (kafka.server.ReplicaFetcherManager)* *[2014-07-17 15:34:47,627] INFO [ReplicaFetcherManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaFetcherManager)* *[2014-07-17 15:34:47,656] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [secure.test,0]
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066467#comment-14066467 ] Neha Narkhede commented on KAFKA-1476: -- Thanks for the patch, [~balaji.sesha...@dish.com]. I think it may be worth trying to combine consumer related tooling into a single tool, much like the topics tool. I can imagine it having a --groups and --offsets option. So, --groups --list will list all groups and --groups --describe --group group-name will describe the group. Similarly --offsets --group group_name will list the offsets. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066471#comment-14066471 ] Jay Kreps commented on KAFKA-1476: -- This looks good to me. One thing we should think about: does this make sense as a standalone command? We have something of a history of making a new command for every possible thing you could do. Often it is a little more usable if we group these together a bit into logically related items. For example, I could imagine having a command bin/kafka-consumers.sh --list-groups or something like that that also performed other consumer-related commands. This sort of the makes the tools a little more usable for users. [~guozhang], [~toddpalino], [~nehanarkhede] what do you guys think? As we do the consumer co-ordinator what admin operations are we going to want tooling for? How does this relate to the consumer offset checker, should that be combined? Let's think this through and then just name this thing appropriately even if it only has 10% of the functionality we envision at the moment. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066477#comment-14066477 ] Jun Rao commented on KAFKA-1414: Thanks for patch v4. Some more comments. 40. LogManager: 40.1. The logic in loadLogs is still not quite right. Even if the shutdown is clean, we will still need to load the log. The initialization logic in Log knows whether to perform recovery or not. Also, we can't initialize dirLogs from logsByDir since it's empty during startup. So, we have to get it from dir.listFiles(). 40.2 The following logging in both loadLogs() and shutdown() is not quite right. error( There was an error in one of the threads during logs loading: {} .format(e.getCause)) This is the slf4j style used in the new clients. The server side still uses log4j. So, it should be error( There was an error in one of the threads during logs loading,, e.getCause) 40.3 Would it be clearer to name dirLogs as logsPerDir and dirJobs as jobsPerDir? 40.4 remove unused import ExecutorService 41. server.properties: 41.1 typo shuch 41.2 Also, would the following description be better? # The number of threads to be used when performing io intensive operations such as # log recovery and log flushing during startup and shutdown. 42. Your patch for Uitls.runnable looks good. Could you include it in the next patch? Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector
[ https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066481#comment-14066481 ] Nicolae Marasoiu commented on KAFKA-1282: - Beautiful, I can't wait to work this out, so I take this to code right?:) Disconnect idle socket connection in Selector - Key: KAFKA-1282 URL: https://issues.apache.org/jira/browse/KAFKA-1282 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie++ Fix For: 0.9.0 To reduce # socket connections, it would be useful for the new producer to close socket connections that are idle. We can introduce a new producer config for the idle time. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066488#comment-14066488 ] Todd Palino commented on KAFKA-1476: I tend to agree, ~jkreps. We currently have kafka-topics.sh that groups a number of topic-centric functions in a single command, and I think that's a good model to continue with. On the consumer side, I can think of the following things we should be able to do: List groups Describe group (should include the hosts in the group) List topics by group (for a given group, what topics does it consume) Describe topic by group (for a given group and topic, list partitions, host owning the partition, and the committed offset) Set offsets (for a given group and topic, explicitly set the offsets. Should allow setting to smallest, largest, and custom, which is explicitly setting the value for each partition) List groups by topic (given a topic name, what groups consume it) All of these functions have to work with both Zookeeper and group management within the brokers (once implemented). I want to know which one the data comes from as part of the results, but I don't want to have to specify it in advance. As far as the offset checker goes, I could go either way on that. Ultimately, it combines information from two different areas (consumer and broker), which means it doesn't fit cleanly in either one even if we all agree that it's really a consumer function. I think it's fine if it stays where it is for now. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
JIRA problem
I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
Re: JIRA problem
You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
[jira] [Assigned] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reassigned KAFKA-1535: -- Assignee: nicu marasoiu (was: Jay Kreps) Nicu, Yes, your observation is correct. The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. Do you want to do a followup patch in this jira? return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: JIRA problem
I would assume that you will have to go to this page: https://issues.apache.org/jira/plugins/servlet/project-config/KAFKA/roles I don’t have access there, I’ve just replaced project name SQOOP with KAFKA :-) Jarcec On Jul 18, 2014, at 9:35 AM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
Re: JIRA problem
Shouldn't we make sure that the people in the contributor group have signed and sent in their ICLA http://www.apache.org/licenses/icla.txt much like we do for confluence? This helps to control also that all contributors have signed an ICLA too. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 12:35 PM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
Re: JIRA problem
To my best knowledge ICLA is required only for committers - people that do have commit bit on ASF repositories. Contributors can’t commit their patch themselves and hence they have to attach their patch to JIRA where they implicitly agree with all the legal stuff. There is some license agreement about that, but I can’t find it right now. Jarcec On Jul 18, 2014, at 9:45 AM, Joe Stein joe.st...@stealth.ly wrote: Shouldn't we make sure that the people in the contributor group have signed and sent in their ICLA http://www.apache.org/licenses/icla.txt much like we do for confluence? This helps to control also that all contributors have signed an ICLA too. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 12:35 PM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
Re: JIRA problem
I am not sure if an ICLA is required for contributing patches. Requiring that may make it harder for people who want to contribute. Currently, only Kafka PMC members can change the contributors list. Thanks, Jun On Fri, Jul 18, 2014 at 9:45 AM, Joe Stein joe.st...@stealth.ly wrote: Shouldn't we make sure that the people in the contributor group have signed and sent in their ICLA http://www.apache.org/licenses/icla.txt much like we do for confluence? This helps to control also that all contributors have signed an ICLA too. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 12:35 PM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
[jira] [Commented] (KAFKA-1510) Force offset commits at a minimum interval when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066532#comment-14066532 ] Guozhang Wang commented on KAFKA-1510: -- Hi Nicolae, Thanks for taking this ticket. You can take a look at the offset management design proposal for the motivations of moving it away from ZK. https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management Force offset commits at a minimum interval when migrating consumer offsets from zookeeper to kafka -- Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Labels: newbie Fix For: 0.8.2 When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown. Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066538#comment-14066538 ] nicu marasoiu commented on KAFKA-1535: -- Sure do want:) Yes, that was my suggestion as well, cache the current list, replacing it. Not sure which structure will remain the best in this scenario: currently mutable Map but perhaps directly immutable list or map being replaced is more functional/pure and performance friendly as well, will check it up:) return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Assigned] (KAFKA-1282) Disconnect idle socket connection in Selector
[ https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nicu marasoiu reassigned KAFKA-1282: Assignee: nicu marasoiu Disconnect idle socket connection in Selector - Key: KAFKA-1282 URL: https://issues.apache.org/jira/browse/KAFKA-1282 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie++ Fix For: 0.9.0 To reduce # socket connections, it would be useful for the new producer to close socket connections that are idle. We can introduce a new producer config for the idle time. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066548#comment-14066548 ] nicu marasoiu commented on KAFKA-1549: -- test if i receive mails on this jira task dead brokers coming in the TopicMetadataResponse Key: KAFKA-1549 URL: https://issues.apache.org/jira/browse/KAFKA-1549 Project: Kafka Issue Type: Bug Environment: trunk Reporter: nicu marasoiu Assignee: nicu marasoiu The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: [DISCUSS] Kafka Security Specific Features
Thanks Joe, I don't see any Out of memory error. Now I get exception when Producer fetches metadata for a topic Here is how I created the topic and run producer pdeshmukh$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic secureTopic Created topic secureTopic. pdeshmukh$ bin/kafka-topics.sh --list --zookeeper localhost:2181 secure.test secureTopic Run producer, tried both localhost:9092:true and localhost:9092 pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true --topic secureTopic [2014-07-18 13:12:29,817] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Hare Krishna [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id 0 for topics [Set(secureTopic)] from broker [id:0,host:localhost,port:9092,secure:true] failed (kafka.client.ClientUtils$) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76) at kafka.producer.SyncProducer.send(SyncProducer.scala:117) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:526) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [2014-07-18 13:12:45,258] ERROR fetching topic metadata for topics [Set(secureTopic)] from broker [ArrayBuffer(id:0,host:localhost,port:9092,secure:true)] failed (kafka.utils.Utils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(secureTopic)] from broker [ArrayBuffer(id:0,host:localhost,port:9092,secure:true)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:526) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) Caused by: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76) at kafka.producer.SyncProducer.send(SyncProducer.scala:117) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) ... 12 more [2014-07-18 13:12:45,337] WARN Fetching topic metadata with correlation id 1 for topics [Set(secureTopic)] from broker [id:0,host:localhost,port:9092,secure:true] failed (kafka.client.ClientUtils$) 2014-07-18 13:12:46,282] ERROR Failed to send requests for topics secureTopic
[jira] [Commented] (KAFKA-1509) Restart of destination broker after unreplicated partition move leaves partitions without leader
[ https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066567#comment-14066567 ] Guozhang Wang commented on KAFKA-1509: -- Yes this is still a valid issue, but could probably be a tricky issue also. I looked through the controller code, basically when a new broker startup the controller needs to try to use the offline-elector to elect the new leaders for those offline partitions which are hosted on the new broker to be also online partitions. But this process is somehow not executed, and instead the periodic preferred leader elector was executed later and failed the process since the new broker is not in the ISR yet. This could be correlated to some bugs in delete-topic logic, but more investigation is needed to find the right fix for this issue. Restart of destination broker after unreplicated partition move leaves partitions without leader Key: KAFKA-1509 URL: https://issues.apache.org/jira/browse/KAFKA-1509 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Albert Strasheim Labels: newbie++ Attachments: controller2.log This should be reasonably easy to reproduce. Make a Kafka cluster with a few machines. Create a topic with partitions on these machines. No replication. Bring up one more Kafka node. Move some or all of the partitions onto this new broker: kafka-reassign-partitions.sh --generate --zookeeper zk:2181 --topics-to-move-json-file move.json --broker-list new broker kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 --reassignment-json-file reassign.json --execute Wait until broker is the leader for all the partitions you moved. Send some data to the partitions. It all works. Shut down the broker that just received the data. Start it back up. {code} Topic:testPartitionCount:2ReplicationFactor:1 Configs: Topic: test Partition: 0Leader: -1 Replicas: 7 Isr: Topic: test Partition: 1Leader: -1 Replicas: 7 Isr: {code} Leader for topic test never gets elected even though this node is the only node that knows about the topic. Some logs: {code} Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: Initialized controller epoch to 53 and zk version 52 (kafka.controller.ControllerEpochListener) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup complete (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3) for partition [requests,0] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1) for partition [requests,13] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5) for partition
Re: JIRA problem
I think having people fax in permission slips to edit the wiki or get bugs assigned to them is a bit hostile to potential contributors. Is this a legal Apache restriction that we have to abide by or just the way our permissions defaulted? Can we change this? Would people be opposed to changing it? -Jay On Fri, Jul 18, 2014 at 9:54 AM, Jun Rao jun...@gmail.com wrote: I am not sure if an ICLA is required for contributing patches. Requiring that may make it harder for people who want to contribute. Currently, only Kafka PMC members can change the contributors list. Thanks, Jun On Fri, Jul 18, 2014 at 9:45 AM, Joe Stein joe.st...@stealth.ly wrote: Shouldn't we make sure that the people in the contributor group have signed and sent in their ICLA http://www.apache.org/licenses/icla.txt much like we do for confluence? This helps to control also that all contributors have signed an ICLA too. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 12:35 PM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
Re: JIRA problem
I do see separate concerns here, so let me describe how we’re addressing them in other projects where I’m involved. It doesn’t mean that Kafka need to follow them, just sharing my experience. 1) Submitting a patch to JIRA No paper work required, all legalities are resolved implicitly by the fact that contributor submitted patch to JIRA. There is small trouble with the fact that new contributor can’t assign the item to himself - this is the way JIRA is configured and you need to talk with Infra about changing that. I’ve tried to change that at some point in the past for my projects and I’ve failed. Feel free to ask again though :-) 2) Editing wiki We’re giving edit privileges to anyone who asks for it. So far it worked very well for us. 3) Promoting contributor to committer Do require ICLA to be signed and faxed to ASF. Jarcec On Jul 18, 2014, at 10:25 AM, Jay Kreps jay.kr...@gmail.com wrote: I think having people fax in permission slips to edit the wiki or get bugs assigned to them is a bit hostile to potential contributors. Is this a legal Apache restriction that we have to abide by or just the way our permissions defaulted? Can we change this? Would people be opposed to changing it? -Jay On Fri, Jul 18, 2014 at 9:54 AM, Jun Rao jun...@gmail.com wrote: I am not sure if an ICLA is required for contributing patches. Requiring that may make it harder for people who want to contribute. Currently, only Kafka PMC members can change the contributors list. Thanks, Jun On Fri, Jul 18, 2014 at 9:45 AM, Joe Stein joe.st...@stealth.ly wrote: Shouldn't we make sure that the people in the contributor group have signed and sent in their ICLA http://www.apache.org/licenses/icla.txt much like we do for confluence? This helps to control also that all contributors have signed an ICLA too. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 12:35 PM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
Re: JIRA problem
Jay, I agree with you. This goes back to the spam fiasco that occurred on confluence and the pendulum swinging because of it. Re-reading that email there doesn't seem anything specific requiring us to have them sign the ICLA but it looks like that is a requirement if we wanted INFRA to assign the person in the asf-cla group... so we could just have our own contributor group and manage that in confluence however we wanted? /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 1:25 PM, Jay Kreps jay.kr...@gmail.com wrote: I think having people fax in permission slips to edit the wiki or get bugs assigned to them is a bit hostile to potential contributors. Is this a legal Apache restriction that we have to abide by or just the way our permissions defaulted? Can we change this? Would people be opposed to changing it? -Jay On Fri, Jul 18, 2014 at 9:54 AM, Jun Rao jun...@gmail.com wrote: I am not sure if an ICLA is required for contributing patches. Requiring that may make it harder for people who want to contribute. Currently, only Kafka PMC members can change the contributors list. Thanks, Jun On Fri, Jul 18, 2014 at 9:45 AM, Joe Stein joe.st...@stealth.ly wrote: Shouldn't we make sure that the people in the contributor group have signed and sent in their ICLA http://www.apache.org/licenses/icla.txt much like we do for confluence? This helps to control also that all contributors have signed an ICLA too. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 12:35 PM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066583#comment-14066583 ] Jay Kreps commented on KAFKA-1476: -- Cool, so [~balaji.seshadri] would you be willing to revamp this a bit? Two approaches: 1. Just change the class name to ConsumerCommand and shell script to kafka-consumers.sh even though it only implements the list groups functionality at the moment. We can file a follow-up bug to implement some of the additional features which anyone can pick up. 2. Implement some of the proposed features now on this ticket. Most of them should be pretty straight-forward... Either way is fine. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: JIRA problem
Joe--that sounds great, let's do that! So I guess we should put something in the wiki about how to get added to that group? Jarek--thanks for the summary, that is very helpful. I will follow-up with the infra people on letting people assign JIRAs. -jay On Fri, Jul 18, 2014 at 10:36 AM, Joe Stein joe.st...@stealth.ly wrote: Jay, I agree with you. This goes back to the spam fiasco that occurred on confluence and the pendulum swinging because of it. Re-reading that email there doesn't seem anything specific requiring us to have them sign the ICLA but it looks like that is a requirement if we wanted INFRA to assign the person in the asf-cla group... so we could just have our own contributor group and manage that in confluence however we wanted? /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 1:25 PM, Jay Kreps jay.kr...@gmail.com wrote: I think having people fax in permission slips to edit the wiki or get bugs assigned to them is a bit hostile to potential contributors. Is this a legal Apache restriction that we have to abide by or just the way our permissions defaulted? Can we change this? Would people be opposed to changing it? -Jay On Fri, Jul 18, 2014 at 9:54 AM, Jun Rao jun...@gmail.com wrote: I am not sure if an ICLA is required for contributing patches. Requiring that may make it harder for people who want to contribute. Currently, only Kafka PMC members can change the contributors list. Thanks, Jun On Fri, Jul 18, 2014 at 9:45 AM, Joe Stein joe.st...@stealth.ly wrote: Shouldn't we make sure that the people in the contributor group have signed and sent in their ICLA http://www.apache.org/licenses/icla.txt much like we do for confluence? This helps to control also that all contributors have signed an ICLA too. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 12:35 PM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066613#comment-14066613 ] Guozhang Wang commented on KAFKA-1476: -- 1. List groups: return the list of group ids. 2. List groups (topic_name): return the list of group ids at least one member of which is consuming the topic. 3. Describe group (group_id): return the current generation id, members of the group (consumer_ids), subscribed topics or topic/partitions of each consumer, and their current offsets. 4. Set group consumption offset (consumer_id, generation_id, [partition, offset]): set the offsets of the consumed partitions for a given consumer, and the right generation id. 1) would work with ZK only, 2) requires ZK and the coordinator, 3) and 4) requires coordinator only. Also 3) would potentially replace the current ConsumerOffsetChecker. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: JIRA problem
I once noticed Samza also had a similar problem and Jacob resolved it by talking to the Apache people. We can probably ask him how he gets it done. On Fri, Jul 18, 2014 at 10:36 AM, Joe Stein joe.st...@stealth.ly wrote: Jay, I agree with you. This goes back to the spam fiasco that occurred on confluence and the pendulum swinging because of it. Re-reading that email there doesn't seem anything specific requiring us to have them sign the ICLA but it looks like that is a requirement if we wanted INFRA to assign the person in the asf-cla group... so we could just have our own contributor group and manage that in confluence however we wanted? /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 1:25 PM, Jay Kreps jay.kr...@gmail.com wrote: I think having people fax in permission slips to edit the wiki or get bugs assigned to them is a bit hostile to potential contributors. Is this a legal Apache restriction that we have to abide by or just the way our permissions defaulted? Can we change this? Would people be opposed to changing it? -Jay On Fri, Jul 18, 2014 at 9:54 AM, Jun Rao jun...@gmail.com wrote: I am not sure if an ICLA is required for contributing patches. Requiring that may make it harder for people who want to contribute. Currently, only Kafka PMC members can change the contributors list. Thanks, Jun On Fri, Jul 18, 2014 at 9:45 AM, Joe Stein joe.st...@stealth.ly wrote: Shouldn't we make sure that the people in the contributor group have signed and sent in their ICLA http://www.apache.org/licenses/icla.txt much like we do for confluence? This helps to control also that all contributors have signed an ICLA too. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 12:35 PM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay -- -- Guozhang
Re: JIRA problem
I have been assigning people on confluence (they granted us access at that time after some back and forth) we already have permission to-do that no reason to talk to INFRA. Everyone in the PMC should have access. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 1:43 PM, Jay Kreps jay.kr...@gmail.com wrote: Joe--that sounds great, let's do that! So I guess we should put something in the wiki about how to get added to that group? Jarek--thanks for the summary, that is very helpful. I will follow-up with the infra people on letting people assign JIRAs. -jay On Fri, Jul 18, 2014 at 10:36 AM, Joe Stein joe.st...@stealth.ly wrote: Jay, I agree with you. This goes back to the spam fiasco that occurred on confluence and the pendulum swinging because of it. Re-reading that email there doesn't seem anything specific requiring us to have them sign the ICLA but it looks like that is a requirement if we wanted INFRA to assign the person in the asf-cla group... so we could just have our own contributor group and manage that in confluence however we wanted? /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 1:25 PM, Jay Kreps jay.kr...@gmail.com wrote: I think having people fax in permission slips to edit the wiki or get bugs assigned to them is a bit hostile to potential contributors. Is this a legal Apache restriction that we have to abide by or just the way our permissions defaulted? Can we change this? Would people be opposed to changing it? -Jay On Fri, Jul 18, 2014 at 9:54 AM, Jun Rao jun...@gmail.com wrote: I am not sure if an ICLA is required for contributing patches. Requiring that may make it harder for people who want to contribute. Currently, only Kafka PMC members can change the contributors list. Thanks, Jun On Fri, Jul 18, 2014 at 9:45 AM, Joe Stein joe.st...@stealth.ly wrote: Shouldn't we make sure that the people in the contributor group have signed and sent in their ICLA http://www.apache.org/licenses/icla.txt much like we do for confluence? This helps to control also that all contributors have signed an ICLA too. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 18, 2014 at 12:35 PM, Jarek Jarcec Cecho jar...@apache.org wrote: You have to add those people as a contributors in JIRA project administration. Click on “roles” in the KAFKA project administration and find field “contributors”. Jarcec On Jul 18, 2014, at 9:32 AM, Jay Kreps jay.kr...@gmail.com wrote: I can't seem to assign JIRAs to anyone but a few committer people. Does anyone know what causes this or how to fix it? Is this happening to anyone else? -Jay
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066695#comment-14066695 ] Balaji Seshadri commented on KAFKA-1476: I have completed 1 and 2,will work on 3/4.Can you guys give me some pointers on how to query coordinator. May i should add print Usage for 1 and 2. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Issue Comment Deleted] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Balaji Seshadri updated KAFKA-1476: --- Comment: was deleted (was: ~jay.kreps ) Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066699#comment-14066699 ] Guozhang Wang commented on KAFKA-1476: -- You are already ahead of us, since the coordinator is still under development :) We just wanted to think in advance of what kind of consumer queries we would like tooling now and in the future, and they can be easily implemented with the proposed design. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066698#comment-14066698 ] Balaji Seshadri commented on KAFKA-1476: ~jay.kreps Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066720#comment-14066720 ] Balaji Seshadri commented on KAFKA-1476: Which shell script should i change,i wrote only the class which i can change ?. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066776#comment-14066776 ] BalajiSeshadri commented on KAFKA-1476: --- Please find patch attached after renaming the class. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476-RENAME.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1476: -- Status: Patch Available (was: In Progress) Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476-RENAME.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1476: -- Assignee: BalajiSeshadri Status: Patch Available (was: Open) Attaching updated patch Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476-RENAME.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1476: -- Attachment: KAFKA-1476-RENAME.patch Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476-RENAME.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nicu marasoiu updated KAFKA-1549: - Description: JunRao confirming my observation that brokers are only added to the metadataCache, never removed: The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. (was: The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it.) dead brokers coming in the TopicMetadataResponse Key: KAFKA-1549 URL: https://issues.apache.org/jira/browse/KAFKA-1549 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Environment: trunk Reporter: nicu marasoiu Assignee: nicu marasoiu JunRao confirming my observation that brokers are only added to the metadataCache, never removed: The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nicu marasoiu updated KAFKA-1549: - Affects Version/s: 0.8.2 dead brokers coming in the TopicMetadataResponse Key: KAFKA-1549 URL: https://issues.apache.org/jira/browse/KAFKA-1549 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Environment: trunk Reporter: nicu marasoiu Assignee: nicu marasoiu The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Assigned] (KAFKA-1510) Force offset commits at a minimum interval when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nicu marasoiu reassigned KAFKA-1510: Assignee: nicu marasoiu Force offset commits at a minimum interval when migrating consumer offsets from zookeeper to kafka -- Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: nicu marasoiu Labels: newbie Fix For: 0.8.2 When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown. Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066784#comment-14066784 ] nicu marasoiu commented on KAFKA-1549: -- esepntially i will replace the list/map of brokers every update. this will allow for the structure to be immutable likely. dead brokers coming in the TopicMetadataResponse Key: KAFKA-1549 URL: https://issues.apache.org/jira/browse/KAFKA-1549 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Environment: trunk Reporter: nicu marasoiu Assignee: nicu marasoiu JunRao confirming my observation that brokers are only added to the metadataCache, never removed: The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066791#comment-14066791 ] nicu marasoiu commented on KAFKA-1535: -- created KAFKA-1549 with Major priority. I also have KAFKA-1282 (which I like most and I would start first), KAFKA-1510, both Major priority. What should be the order of doing them (I think they are all major right now) ? I would tackle 1282, 1549 then 1510. return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1510) Force offset commits at a minimum interval when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066792#comment-14066792 ] Joel Koshy commented on KAFKA-1510: --- I think it should be sufficient to force commit only on shutdown while dual-commit is enabled. i.e., no need to force commit at intervals. Force offset commits at a minimum interval when migrating consumer offsets from zookeeper to kafka -- Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: nicu marasoiu Labels: newbie Fix For: 0.8.2 When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown. Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066791#comment-14066791 ] nicu marasoiu edited comment on KAFKA-1535 at 7/18/14 7:50 PM: --- created KAFKA-1549 with Major priority. I also have KAFKA-1282 (which I like most and I would start first), and KAFKA-1510, both Major priority. What should be the order of doing them (I think they are all major right now) ? I would tackle 1282, 1549 then 1510. was (Author: nmarasoi): created KAFKA-1549 with Major priority. I also have KAFKA-1282 (which I like most and I would start first), KAFKA-1510, both Major priority. What should be the order of doing them (I think they are all major right now) ? I would tackle 1282, 1549 then 1510. return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1550) Patch review tool should use git format-patch to generate patch
Dong Lin created KAFKA-1550: --- Summary: Patch review tool should use git format-patch to generate patch Key: KAFKA-1550 URL: https://issues.apache.org/jira/browse/KAFKA-1550 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Priority: Minor kafka-patch-review.py uses git-diff to generate patch for jira ticket. The resulting patch includes local uncommitted changes and is therefore different from the patch published to reviewboard. kafka-patch-review.py updates the jira ticket with attaching patch even if the reviewboard specified in --rb option is not found. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1550) Patch review tool should use git format-patch to generate patch
[ https://issues.apache.org/jira/browse/KAFKA-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-1550: Description: 1) kafka-patch-review.py uses git-diff to generate patch for jira ticket. The resulting patch includes local uncommitted changes and is therefore different from the patch published to reviewboard. 2) kafka-patch-review.py updates the jira ticket with attaching patch even if the reviewboard update fails 3) os.popen is deprecated. Use subprocess.popen instead and was: kafka-patch-review.py uses git-diff to generate patch for jira ticket. The resulting patch includes local uncommitted changes and is therefore different from the patch published to reviewboard. kafka-patch-review.py updates the jira ticket with attaching patch even if the reviewboard specified in --rb option is not found. Patch review tool should use git format-patch to generate patch --- Key: KAFKA-1550 URL: https://issues.apache.org/jira/browse/KAFKA-1550 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Priority: Minor Original Estimate: 24h Remaining Estimate: 24h 1) kafka-patch-review.py uses git-diff to generate patch for jira ticket. The resulting patch includes local uncommitted changes and is therefore different from the patch published to reviewboard. 2) kafka-patch-review.py updates the jira ticket with attaching patch even if the reviewboard update fails 3) os.popen is deprecated. Use subprocess.popen instead and -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1550) Patch review tool should use git format-patch to generate patch
[ https://issues.apache.org/jira/browse/KAFKA-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-1550: Description: 1) kafka-patch-review.py uses git-diff to generate patch for jira ticket. The resulting patch includes local uncommitted changes and is therefore different from the patch published to reviewboard. 2) kafka-patch-review.py updates the jira ticket with attaching patch even if the reviewboard update fails 3) os.popen is deprecated. Use subprocess.popen instead. was: 1) kafka-patch-review.py uses git-diff to generate patch for jira ticket. The resulting patch includes local uncommitted changes and is therefore different from the patch published to reviewboard. 2) kafka-patch-review.py updates the jira ticket with attaching patch even if the reviewboard update fails 3) os.popen is deprecated. Use subprocess.popen instead and Patch review tool should use git format-patch to generate patch --- Key: KAFKA-1550 URL: https://issues.apache.org/jira/browse/KAFKA-1550 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Priority: Minor Original Estimate: 24h Remaining Estimate: 24h 1) kafka-patch-review.py uses git-diff to generate patch for jira ticket. The resulting patch includes local uncommitted changes and is therefore different from the patch published to reviewboard. 2) kafka-patch-review.py updates the jira ticket with attaching patch even if the reviewboard update fails 3) os.popen is deprecated. Use subprocess.popen instead. -- This message was sent by Atlassian JIRA (v6.2#6252)
Review Request 23692: Patch for KAFKA-1550
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23692/ --- Review request for kafka. Bugs: KAFKA-1550 https://issues.apache.org/jira/browse/KAFKA-1550 Repository: kafka Description --- KAFKA-1550 Patch review tool should use git format-patch to generate patch Diffs - kafka-patch-review.py dc45549f886440f1721c60aab9aa0a4af9b4cbef Diff: https://reviews.apache.org/r/23692/diff/ Testing --- Thanks, Dong Lin
[jira] [Commented] (KAFKA-1550) Patch review tool should use git format-patch to generate patch
[ https://issues.apache.org/jira/browse/KAFKA-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066871#comment-14066871 ] Dong Lin commented on KAFKA-1550: - Created reviewboard https://reviews.apache.org/r/23692/diff/ against branch origin/trunk Patch review tool should use git format-patch to generate patch --- Key: KAFKA-1550 URL: https://issues.apache.org/jira/browse/KAFKA-1550 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Priority: Minor Attachments: KAFKA-1550.patch Original Estimate: 24h Remaining Estimate: 24h 1) kafka-patch-review.py uses git-diff to generate patch for jira ticket. The resulting patch includes local uncommitted changes and is therefore different from the patch published to reviewboard. 2) kafka-patch-review.py updates the jira ticket with attaching patch even if the reviewboard update fails 3) os.popen is deprecated. Use subprocess.popen instead. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1550) Patch review tool should use git format-patch to generate patch
[ https://issues.apache.org/jira/browse/KAFKA-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-1550: Attachment: KAFKA-1550.patch Patch review tool should use git format-patch to generate patch --- Key: KAFKA-1550 URL: https://issues.apache.org/jira/browse/KAFKA-1550 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Priority: Minor Attachments: KAFKA-1550.patch Original Estimate: 24h Remaining Estimate: 24h 1) kafka-patch-review.py uses git-diff to generate patch for jira ticket. The resulting patch includes local uncommitted changes and is therefore different from the patch published to reviewboard. 2) kafka-patch-review.py updates the jira ticket with attaching patch even if the reviewboard update fails 3) os.popen is deprecated. Use subprocess.popen instead. -- This message was sent by Atlassian JIRA (v6.2#6252)
Improving the Kafka client ecosystem
A question was asked in another thread about what was an effective way to contribute to the Kafka project for people who weren't very enthusiastic about writing Java/Scala code. I wanted to kind of advocate for an area I think is really important and not as good as it could be--the client ecosystem. I think our goal is to make Kafka effective as a general purpose, centralized, data subscription system. This vision only really works if all your applications, are able to integrate easily, whatever language they are in. We have a number of pretty good non-java producers. We have been lacking the features on the server-side to make writing non-java consumers easy. We are fixing that right now as part of the consumer work going on right now (which moves a lot of the functionality in the java consumer to the server side). But apart from this I think there may be a lot more we can do to make the client ecosystem better. Here are some concrete ideas. If anyone has additional ideas please reply to this thread and share them. If you are interested in picking any of these up, please do. 1. The most obvious way to improve the ecosystem is to help work on clients. This doesn't necessarily mean writing new clients, since in many cases we already have a client in a given language. I think any way we can incentivize fewer, better clients rather than many half-working clients we should do. However we are working now on the server-side consumer co-ordination so it should now be possible to write much simpler consumers. 2. It would be great if someone put together a mailing list just for client developers to share tips, tricks, problems, and so on. We can make sure all the main contributors on this too. I think this could be a forum for kind of directing improvements in this area. 3. Help improve the documentation on how to implement a client. We have tried to make the protocol spec not just a dry document but also have it share best practices, rationale, and intentions. I think this could potentially be even better as there is really a range of options from a very simple quick implementation to a more complex highly optimized version. It would be good to really document some of the options and tradeoffs. 4. Come up with a standard way of documenting the features of clients. In an ideal world it would be possible to get the same information (author, language, feature set, download link, source code, etc) for all clients. It would be great to standardize the documentation for the client as well. For example having one or two basic examples that are repeated for every client in a standardized way. This would let someone come to the Kafka site who is not a java developer, and click on the link for their language and view examples of interacting with Kafka in the language they know using the client they would eventually use. 5. Build a Kafka Client Compatibility Kit (KCCK) :-) The idea is this: anyone who wants to implement a client would implement a simple command line program with a set of standardized options. The compatibility kit would be a standard set of scripts that ran their client using this command line driver and validate its behavior. E.g. for a producer it would test that it correctly can send messages, that the ordering is retained, that the client correctly handles reconnection and metadata refresh, and compression. The output would be a list of features that passed are certified, and perhaps basic performance information. This would be an easy way to help client developers write correct clients, as well as having a standardized comparison for the clients that says that they work correctly. -Jay
Re: Review Request 23442: Patch for KAFKA-1330
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23442/#review48139 --- Ran unit tests twice. Saw the following failure twice. kafka.api.IntegrationTestHarness junit.framework.TestSuite$1.warning FAILED junit.framework.AssertionFailedError: Class kafka.api.IntegrationTestHarness has no public constructor TestCase(String name) or TestCase() at junit.framework.Assert.fail(Assert.java:47) at junit.framework.TestSuite$1.runTest(TestSuite.java:93) Saw the following failure once. kafka.api.ConsumerTest testSimpleConsumer FAILED java.lang.AssertionError: expected:4178 but was:0 at org.junit.Assert.fail(Assert.java:69) at org.junit.Assert.failNotEquals(Assert.java:314) at org.junit.Assert.assertEquals(Assert.java:94) at org.junit.Assert.assertEquals(Assert.java:104) at kafka.api.ConsumerTest$$anonfun$testSimpleConsumer$1.apply$mcVI$sp(ConsumerTest.scala:31) at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) at kafka.api.ConsumerTest.testSimpleConsumer(ConsumerTest.scala:27) core/src/test/scala/integration/kafka/api/ConsumerTest.scala https://reviews.apache.org/r/23442/#comment84471 License header. - Jun Rao On July 12, 2014, 8:06 p.m., Jay Kreps wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23442/ --- (Updated July 12, 2014, 8:06 p.m.) Review request for kafka. Bugs: KAFKA-1330 https://issues.apache.org/jira/browse/KAFKA-1330 Repository: kafka Description --- Draft version of the new consumer. Diffs - clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java d8f9ce663ee24d2b0852c974136741280c39f8f8 clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 227f5646ee708af1b861c15237eda2140cfd4900 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 46efc0c8483acacf42b2984ac3f3b9e0a4566187 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 436d8a479166eda29f2672b50fc99f288bbe3fa9 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java fe93afa24fc20b03830f1d190a276041d15bd3b9 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c3aad3b4d6b677f759583f309061193f2f109250 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 4aa5b01d611631db72df47d50bbe30edb8c478db clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java a016269512b6d6d6e0fd3fab997e9c8265024eb4 clients/src/main/java/org/apache/kafka/common/Cluster.java c62707ab3aba26771fc4b993df28bf8c44f32309 clients/src/main/java/org/apache/kafka/common/network/Selectable.java b68bbf00ab8eba6c5867d346c91188142593ca6e clients/src/main/java/org/apache/kafka/common/network/Selector.java 93f2f1c7b229205fc846b4e8bba527dd15355eb0 clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 6fe7573973832615976defa37fe0dfbb8f911939 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 044b03061802ee5e8ea4f1995fb0988e1a70e9a7 clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 8cecba50bf067713184208552af36469962cd628 clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/utils/Utils.java 50af60198a3f20933d0e8cf89c3b95d89ee73f35 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 2f98192b064d1ce7c0779e901293edb8c3801915 clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java f06e28ce21e80c1265258ad3ac7900b99e61493d clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 5c5e3d40819e41cab7b52a0eeaee5f2e7317b7b3 clients/src/test/java/org/apache/kafka/test/MockSelector.java d61de529173314c044261ad9662bec735d67e97f
Re: Review Request 23593: Fix KAFKA-1533: Address Jun's comments
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23593/#review48162 --- Neha also had a comment in the previous RB. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/23593/#comment84473 The issue is that this will always delay the very first message by metadata refresh backoff time. Would it be better to only update this value after line 356 and after line 367? - Jun Rao On July 16, 2014, 11:38 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23593/ --- (Updated July 16, 2014, 11:38 p.m.) Review request for kafka. Bugs: KAFKA-1533 https://issues.apache.org/jira/browse/KAFKA-1533 Repository: kafka Description --- 1. Add the metadataRefreshAttemptMS in NetworkClient for backing off; 2. Refactor Producer API tests using KafkaTestHarness; 3. Change default backoff time to 100ms for test utils Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java d8f9ce663ee24d2b0852c974136741280c39f8f8 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 4aa5b01d611631db72df47d50bbe30edb8c478db core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 15fd5bcbaf175a0f7d7cf0b142e63f705ca9b6ae core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 34a7db4b4ea2b720476c2b1f22a623a997faffbc core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 194dd70919a5f301d3131c56594e40a0ebb27311 core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23593/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067014#comment-14067014 ] Jun Rao commented on KAFKA-1414: I can't think of better names. So that sounds good. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
Review Request 23697: Fix KAFKA-1533: Address Jun and Neha's comments
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23697/ --- Review request for kafka. Bugs: KAFKA-1533 https://issues.apache.org/jira/browse/KAFKA-1533 Repository: kafka Description --- 1. Add the metadataRefreshAttemptMs in NetworkClient for backing off; 2. Refactor Producer API tests using KafkaTestHarness; 3. Change default backoff time to 100ms for test utils Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java d8f9ce663ee24d2b0852c974136741280c39f8f8 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 4aa5b01d611631db72df47d50bbe30edb8c478db core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 15fd5bcbaf175a0f7d7cf0b142e63f705ca9b6ae core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 34a7db4b4ea2b720476c2b1f22a623a997faffbc core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 194dd70919a5f301d3131c56594e40a0ebb27311 core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23697/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-1533) transient unit test failure in ProducerFailureHandlingTest
[ https://issues.apache.org/jira/browse/KAFKA-1533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1533: - Attachment: KAFKA-1533.patch transient unit test failure in ProducerFailureHandlingTest -- Key: KAFKA-1533 URL: https://issues.apache.org/jira/browse/KAFKA-1533 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Attachments: KAFKA-1533.patch, KAFKA-1533.patch, KAFKA-1533.patch Occasionally, saw the test hang on tear down. The following is the stack trace. Test worker prio=5 tid=7f9246956000 nid=0x10e078000 in Object.wait() [10e075000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at java.lang.Object.wait(Object.java:485) at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344) - locked 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:732) at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:91) at org.I0Itec.zkclient.ZkClient$8.call(ZkClient.java:720) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:716) at kafka.utils.ZkUtils$.deletePath(ZkUtils.scala:416) at kafka.utils.ZkUtils$.deregisterBrokerInZk(ZkUtils.scala:184) at kafka.server.KafkaHealthcheck.shutdown(KafkaHealthcheck.scala:50) at kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:243) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:45) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:45) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:243) at kafka.api.ProducerFailureHandlingTest.tearDown(ProducerFailureHandlingTest.scala:90) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1533) transient unit test failure in ProducerFailureHandlingTest
[ https://issues.apache.org/jira/browse/KAFKA-1533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067028#comment-14067028 ] Guozhang Wang commented on KAFKA-1533: -- Created reviewboard https://reviews.apache.org/r/23697/ against branch origin/trunk transient unit test failure in ProducerFailureHandlingTest -- Key: KAFKA-1533 URL: https://issues.apache.org/jira/browse/KAFKA-1533 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Attachments: KAFKA-1533.patch, KAFKA-1533.patch, KAFKA-1533.patch Occasionally, saw the test hang on tear down. The following is the stack trace. Test worker prio=5 tid=7f9246956000 nid=0x10e078000 in Object.wait() [10e075000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at java.lang.Object.wait(Object.java:485) at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344) - locked 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:732) at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:91) at org.I0Itec.zkclient.ZkClient$8.call(ZkClient.java:720) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:716) at kafka.utils.ZkUtils$.deletePath(ZkUtils.scala:416) at kafka.utils.ZkUtils$.deregisterBrokerInZk(ZkUtils.scala:184) at kafka.server.KafkaHealthcheck.shutdown(KafkaHealthcheck.scala:50) at kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:243) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:45) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:45) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:243) at kafka.api.ProducerFailureHandlingTest.tearDown(ProducerFailureHandlingTest.scala:90) -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Improving the Kafka client ecosystem
Another important part of eco-system could be around the adaptors of getting data from other systems into Kafka and vice versa. So, for the ingestion part, this can include things like getting data from mysql, syslog, apache server log, etc. For the egress part, this can include putting Kafka data into HDFS, S3, etc. Will a separate mailing list be convenient? Could we just use the Kafka mailing list? Thanks, Jun On Fri, Jul 18, 2014 at 2:34 PM, Jay Kreps jay.kr...@gmail.com wrote: A question was asked in another thread about what was an effective way to contribute to the Kafka project for people who weren't very enthusiastic about writing Java/Scala code. I wanted to kind of advocate for an area I think is really important and not as good as it could be--the client ecosystem. I think our goal is to make Kafka effective as a general purpose, centralized, data subscription system. This vision only really works if all your applications, are able to integrate easily, whatever language they are in. We have a number of pretty good non-java producers. We have been lacking the features on the server-side to make writing non-java consumers easy. We are fixing that right now as part of the consumer work going on right now (which moves a lot of the functionality in the java consumer to the server side). But apart from this I think there may be a lot more we can do to make the client ecosystem better. Here are some concrete ideas. If anyone has additional ideas please reply to this thread and share them. If you are interested in picking any of these up, please do. 1. The most obvious way to improve the ecosystem is to help work on clients. This doesn't necessarily mean writing new clients, since in many cases we already have a client in a given language. I think any way we can incentivize fewer, better clients rather than many half-working clients we should do. However we are working now on the server-side consumer co-ordination so it should now be possible to write much simpler consumers. 2. It would be great if someone put together a mailing list just for client developers to share tips, tricks, problems, and so on. We can make sure all the main contributors on this too. I think this could be a forum for kind of directing improvements in this area. 3. Help improve the documentation on how to implement a client. We have tried to make the protocol spec not just a dry document but also have it share best practices, rationale, and intentions. I think this could potentially be even better as there is really a range of options from a very simple quick implementation to a more complex highly optimized version. It would be good to really document some of the options and tradeoffs. 4. Come up with a standard way of documenting the features of clients. In an ideal world it would be possible to get the same information (author, language, feature set, download link, source code, etc) for all clients. It would be great to standardize the documentation for the client as well. For example having one or two basic examples that are repeated for every client in a standardized way. This would let someone come to the Kafka site who is not a java developer, and click on the link for their language and view examples of interacting with Kafka in the language they know using the client they would eventually use. 5. Build a Kafka Client Compatibility Kit (KCCK) :-) The idea is this: anyone who wants to implement a client would implement a simple command line program with a set of standardized options. The compatibility kit would be a standard set of scripts that ran their client using this command line driver and validate its behavior. E.g. for a producer it would test that it correctly can send messages, that the ordering is retained, that the client correctly handles reconnection and metadata refresh, and compression. The output would be a list of features that passed are certified, and perhaps basic performance information. This would be an easy way to help client developers write correct clients, as well as having a standardized comparison for the clients that says that they work correctly. -Jay
Re: Review Request 23655: Patch for KAFKA-687
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23655/ --- (Updated July 18, 2014, 10:55 p.m.) Review request for kafka. Summary (updated) - Patch for KAFKA-687 Bugs: KAFKA-687 https://issues.apache.org/jira/browse/KAFKA-687 Repository: kafka Description (updated) --- tweaks Ready to submit Add owner count gauges. Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConfig.scala 1cf2f62ba02e4aa66bfa7575865e5d57baf82212 core/src/main/scala/kafka/consumer/PartitionAllocator.scala PRE-CREATION core/src/main/scala/kafka/consumer/TopicCount.scala c79311097c5bd6718cb6a7fc403f804a1a939353 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 65f518d47c7555c42c4bff39c211814831f4b8b6 core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala a20ab90165cc7ebb1cf44078efe23a53938c8df6 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/consumer/PartitionAllocatorTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/23655/diff/ Testing --- * I did the unit tests (including the new one) as well as mirror maker system test suite with roundrobin. While this is being reviewed I will run the system tests with symmetric Thanks, Joel Koshy
[jira] [Commented] (KAFKA-687) Rebalance algorithm should consider partitions from all topics
[ https://issues.apache.org/jira/browse/KAFKA-687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067051#comment-14067051 ] Joel Koshy commented on KAFKA-687: -- Updated reviewboard https://reviews.apache.org/r/23655/diff/ against branch origin/trunk Rebalance algorithm should consider partitions from all topics -- Key: KAFKA-687 URL: https://issues.apache.org/jira/browse/KAFKA-687 Project: Kafka Issue Type: Improvement Affects Versions: 0.9.0 Reporter: Pablo Barrera Assignee: Joel Koshy Attachments: KAFKA-687.patch, KAFKA-687_2014-07-18_15:55:15.patch The current rebalance step, as stated in the original Kafka paper [1], splits the partitions per topic between all the consumers. So if you have 100 topics with 2 partitions each and 10 consumers only two consumers will be used. That is, for each topic all partitions will be listed and shared between the consumers in the consumer group in order (not randomly). If the consumer group is reading from several topics at the same time it makes sense to split all the partitions from all topics between all the consumer. Following the example, we will have 200 partitions in total, 20 per consumer, using the 10 consumers. The load per topic could be different and the division should consider this. However even a random division should be better than the current algorithm while reading from several topics and should harm reading from a few topics with several partitions. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-687) Rebalance algorithm should consider partitions from all topics
[ https://issues.apache.org/jira/browse/KAFKA-687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-687: - Attachment: KAFKA-687_2014-07-18_15:55:15.patch Rebalance algorithm should consider partitions from all topics -- Key: KAFKA-687 URL: https://issues.apache.org/jira/browse/KAFKA-687 Project: Kafka Issue Type: Improvement Affects Versions: 0.9.0 Reporter: Pablo Barrera Assignee: Joel Koshy Attachments: KAFKA-687.patch, KAFKA-687_2014-07-18_15:55:15.patch The current rebalance step, as stated in the original Kafka paper [1], splits the partitions per topic between all the consumers. So if you have 100 topics with 2 partitions each and 10 consumers only two consumers will be used. That is, for each topic all partitions will be listed and shared between the consumers in the consumer group in order (not randomly). If the consumer group is reading from several topics at the same time it makes sense to split all the partitions from all topics between all the consumer. Following the example, we will have 200 partitions in total, 20 per consumer, using the 10 consumers. The load per topic could be different and the division should consider this. However even a random division should be better than the current algorithm while reading from several topics and should harm reading from a few topics with several partitions. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23655: Patch for KAFKA-687
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23655/ --- (Updated July 18, 2014, 10:57 p.m.) Review request for kafka. Bugs: KAFKA-687 https://issues.apache.org/jira/browse/KAFKA-687 Repository: kafka Description (updated) --- The updated diff contains the mbeans for ownership counts. The comments in the code and the summary are pretty self-explanatory. Things to think about: * Naming - do symmetric/range/roundrobin make sense? * The comments briefly summarize why we needed a separate symmetric mode but let me know if that is unclear. * Rebalance time will be slightly higher - I have not measured (will do that) Diffs - core/src/main/scala/kafka/consumer/ConsumerConfig.scala 1cf2f62ba02e4aa66bfa7575865e5d57baf82212 core/src/main/scala/kafka/consumer/PartitionAllocator.scala PRE-CREATION core/src/main/scala/kafka/consumer/TopicCount.scala c79311097c5bd6718cb6a7fc403f804a1a939353 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 65f518d47c7555c42c4bff39c211814831f4b8b6 core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala a20ab90165cc7ebb1cf44078efe23a53938c8df6 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/consumer/PartitionAllocatorTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/23655/diff/ Testing --- * I did the unit tests (including the new one) as well as mirror maker system test suite with roundrobin. While this is being reviewed I will run the system tests with symmetric Thanks, Joel Koshy
Re: Improving the Kafka client ecosystem
Basically my thought with getting a separate mailing list was to have a place specifically to discuss issues around clients. I don't see a lot of discussion about them on the main list. I thought perhaps this was because people don't like to ask questions which are about adjacent projects/code bases. But basically whatever will lead to a robust discussion, bug tracking, etc on clients. -Jay On Fri, Jul 18, 2014 at 3:49 PM, Jun Rao jun...@gmail.com wrote: Another important part of eco-system could be around the adaptors of getting data from other systems into Kafka and vice versa. So, for the ingestion part, this can include things like getting data from mysql, syslog, apache server log, etc. For the egress part, this can include putting Kafka data into HDFS, S3, etc. Will a separate mailing list be convenient? Could we just use the Kafka mailing list? Thanks, Jun On Fri, Jul 18, 2014 at 2:34 PM, Jay Kreps jay.kr...@gmail.com wrote: A question was asked in another thread about what was an effective way to contribute to the Kafka project for people who weren't very enthusiastic about writing Java/Scala code. I wanted to kind of advocate for an area I think is really important and not as good as it could be--the client ecosystem. I think our goal is to make Kafka effective as a general purpose, centralized, data subscription system. This vision only really works if all your applications, are able to integrate easily, whatever language they are in. We have a number of pretty good non-java producers. We have been lacking the features on the server-side to make writing non-java consumers easy. We are fixing that right now as part of the consumer work going on right now (which moves a lot of the functionality in the java consumer to the server side). But apart from this I think there may be a lot more we can do to make the client ecosystem better. Here are some concrete ideas. If anyone has additional ideas please reply to this thread and share them. If you are interested in picking any of these up, please do. 1. The most obvious way to improve the ecosystem is to help work on clients. This doesn't necessarily mean writing new clients, since in many cases we already have a client in a given language. I think any way we can incentivize fewer, better clients rather than many half-working clients we should do. However we are working now on the server-side consumer co-ordination so it should now be possible to write much simpler consumers. 2. It would be great if someone put together a mailing list just for client developers to share tips, tricks, problems, and so on. We can make sure all the main contributors on this too. I think this could be a forum for kind of directing improvements in this area. 3. Help improve the documentation on how to implement a client. We have tried to make the protocol spec not just a dry document but also have it share best practices, rationale, and intentions. I think this could potentially be even better as there is really a range of options from a very simple quick implementation to a more complex highly optimized version. It would be good to really document some of the options and tradeoffs. 4. Come up with a standard way of documenting the features of clients. In an ideal world it would be possible to get the same information (author, language, feature set, download link, source code, etc) for all clients. It would be great to standardize the documentation for the client as well. For example having one or two basic examples that are repeated for every client in a standardized way. This would let someone come to the Kafka site who is not a java developer, and click on the link for their language and view examples of interacting with Kafka in the language they know using the client they would eventually use. 5. Build a Kafka Client Compatibility Kit (KCCK) :-) The idea is this: anyone who wants to implement a client would implement a simple command line program with a set of standardized options. The compatibility kit would be a standard set of scripts that ran their client using this command line driver and validate its behavior. E.g. for a producer it would test that it correctly can send messages, that the ordering is retained, that the client correctly handles reconnection and metadata refresh, and compression. The output would be a list of features that passed are certified, and perhaps basic performance information. This would be an easy way to help client developers write correct clients, as well as having a standardized comparison for the clients that says that they work correctly. -Jay
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067065#comment-14067065 ] Anton Karamanov commented on KAFKA-1414: But that would actually require changing it everywhere in the code for consistency, including server config. Is that OK within that task? Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API
[ https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067071#comment-14067071 ] BalajiSeshadri commented on KAFKA-328: -- Is this the kind of test you are expecting. @Test def testServerStartupConsecutively(){ var server = new KafkaServer(config) server.startup() Thread.sleep(100) try{ server.startup() } catch{ case ex = { assertTrue(ex.getMessage().contains(This scheduler has already been started!)) } } server.shutdown() } Write unit test for kafka server startup and shutdown API -- Key: KAFKA-328 URL: https://issues.apache.org/jira/browse/KAFKA-328 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: BalajiSeshadri Labels: newbie Background discussion in KAFKA-320 People often try to embed KafkaServer in an application that ends up calling startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this works correctly we have to be very careful about cleaning up resources. This is a good practice for making unit tests reliable anyway. A good first step would be to add some unit tests on startup and shutdown to cover various cases: 1. A Kafka server can startup if it is not already starting up, if it is not currently being shutdown, or if it hasn't been already started 2. A Kafka server can shutdown if it is not already shutting down, if it is not currently starting up, or if it hasn't been already shutdown. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067074#comment-14067074 ] Jay Kreps commented on KAFKA-1535: -- [~nmarasoi] Totally up to you. return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie Attachments: KAFKA-1535__return_all_live_brokers_in_TopicMetadataResponse_.patch Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
Review Request 23699: Fix KAFKA-1430
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23699/ --- Review request for kafka. Bugs: KAFKA-1430 https://issues.apache.org/jira/browse/KAFKA-1430 Repository: kafka Description --- Rebased on KAFKA-1462: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Remove the read lock on Partition's reading of the leaderOpt and epoch and making them volatile instead since these two functions are just single read; 4. Fix some minor issues in TestEndToEndLatency; 5. Other minor fixes Diffs - core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b Diff: https://reviews.apache.org/r/23699/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-1430) Purgatory redesign
[ https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1430: - Attachment: KAFKA-1430.patch Purgatory redesign -- Key: KAFKA-1430 URL: https://issues.apache.org/jira/browse/KAFKA-1430 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Guozhang Wang Attachments: KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430_2014-06-05_17:07:37.patch, KAFKA-1430_2014-06-05_17:13:13.patch, KAFKA-1430_2014-06-05_17:40:53.patch, KAFKA-1430_2014-06-10_11:22:06.patch, KAFKA-1430_2014-06-10_11:26:02.patch, KAFKA-1430_2014-07-11_10:59:13.patch We have seen 2 main issues with the Purgatory. 1. There is no atomic checkAndWatch functionality. So, a client typically first checks whether a request is satisfied or not and then register the watcher. However, by the time the watcher is registered, the registered item could already be satisfied. This item won't be satisfied until the next update happens or the delayed time expires, which means the watched item could be delayed. 2. FetchRequestPurgatory doesn't quite work. This is because the current design tries to incrementally maintain the accumulated bytes ready for fetch. However, this is difficult since the right time to check whether a fetch (for regular consumer) request is satisfied is when the high watermark moves. At that point, it's hard to figure out how many bytes we should incrementally add to each pending fetch request. The problem has been reported in KAFKA-1150 and KAFKA-703. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1430) Purgatory redesign
[ https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067182#comment-14067182 ] Guozhang Wang commented on KAFKA-1430: -- Created reviewboard https://reviews.apache.org/r/23699/ against branch origin/trunk Purgatory redesign -- Key: KAFKA-1430 URL: https://issues.apache.org/jira/browse/KAFKA-1430 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Guozhang Wang Attachments: KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430_2014-06-05_17:07:37.patch, KAFKA-1430_2014-06-05_17:13:13.patch, KAFKA-1430_2014-06-05_17:40:53.patch, KAFKA-1430_2014-06-10_11:22:06.patch, KAFKA-1430_2014-06-10_11:26:02.patch, KAFKA-1430_2014-07-11_10:59:13.patch We have seen 2 main issues with the Purgatory. 1. There is no atomic checkAndWatch functionality. So, a client typically first checks whether a request is satisfied or not and then register the watcher. However, by the time the watcher is registered, the registered item could already be satisfied. This item won't be satisfied until the next update happens or the delayed time expires, which means the watched item could be delayed. 2. FetchRequestPurgatory doesn't quite work. This is because the current design tries to incrementally maintain the accumulated bytes ready for fetch. However, this is difficult since the right time to check whether a fetch (for regular consumer) request is satisfied is when the high watermark moves. At that point, it's hard to figure out how many bytes we should incrementally add to each pending fetch request. The problem has been reported in KAFKA-1150 and KAFKA-703. -- This message was sent by Atlassian JIRA (v6.2#6252)
Review Request 23702: Patch for KAFKA-1070
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 50b09edb73af1b45f88f919ac8c46ae056878c8e core/src/main/scala/kafka/server/KafkaServer.scala def1dc2a5818d45d9ee0881137ff989cec4eb9b1 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067273#comment-14067273 ] Sriharsha Chintalapani commented on KAFKA-1070: --- Created reviewboard https://reviews.apache.org/r/23702/diff/ against branch origin/trunk Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Attachments: KAFKA-1070.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067274#comment-14067274 ] Sriharsha Chintalapani commented on KAFKA-1070: --- Thanks [~jkreps] Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Attachments: KAFKA-1070.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1070: -- Attachment: KAFKA-1070.patch Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Attachments: KAFKA-1070.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067380#comment-14067380 ] Jun Rao commented on KAFKA-1414: Oh, I was thinking about the local var referenced in loadLogs() and shutdown(). Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)