[jira] [Commented] (KAFKA-2398) Transient test failure for SocketServerTest - Socket closed.
[ https://issues.apache.org/jira/browse/KAFKA-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652002#comment-14652002 ] Jiangjie Qin commented on KAFKA-2398: - Thanks, [~ijuma]. I saw this on Mac but not Yosemite, maybe it is still the same reason though. Transient test failure for SocketServerTest - Socket closed. Key: KAFKA-2398 URL: https://issues.apache.org/jira/browse/KAFKA-2398 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin See the following transient test failure for SocketServerTest. kafka.network.SocketServerTest simpleRequest FAILED java.net.SocketException: Socket closed at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at kafka.network.SocketServerTest.connect(SocketServerTest.scala:84) at kafka.network.SocketServerTest.simpleRequest(SocketServerTest.scala:94) kafka.network.SocketServerTest tooBigRequestIsRejected FAILED java.net.SocketException: Socket closed at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at kafka.network.SocketServerTest.connect(SocketServerTest.scala:84) at kafka.network.SocketServerTest.tooBigRequestIsRejected(SocketServerTest.scala:124) kafka.network.SocketServerTest testSocketsCloseOnShutdown FAILED java.net.SocketException: Socket closed at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at kafka.network.SocketServerTest.connect(SocketServerTest.scala:84) at kafka.network.SocketServerTest.testSocketsCloseOnShutdown(SocketServerTest.scala:136) kafka.network.SocketServerTest testMaxConnectionsPerIp FAILED java.net.SocketException: Socket closed at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at kafka.network.SocketServerTest.connect(SocketServerTest.scala:84) at kafka.network.SocketServerTest$$anonfun$1.apply(SocketServerTest.scala:170) at kafka.network.SocketServerTest$$anonfun$1.apply(SocketServerTest.scala:170) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.network.SocketServerTest.testMaxConnectionsPerIp(SocketServerTest.scala:170) kafka.network.SocketServerTest
[jira] [Work started] (KAFKA-2384) Override commit message title in kafka-merge-pr.py
[ https://issues.apache.org/jira/browse/KAFKA-2384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2384 started by Ismael Juma. -- Override commit message title in kafka-merge-pr.py -- Key: KAFKA-2384 URL: https://issues.apache.org/jira/browse/KAFKA-2384 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Ismael Juma Fix For: 0.8.3 It would be more convenient allow setting the commit message in the merging script; right now the script takes the PR title as is and the contributors have to change them according to the submission-review guidelines before doing the merge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2398) Transient test failure for SocketServerTest - Socket closed.
[ https://issues.apache.org/jira/browse/KAFKA-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14651917#comment-14651917 ] Ismael Juma commented on KAFKA-2398: [~granthenke] described the reason in the following mailing list thread: http://search-hadoop.com/m/uyzND1hazyw1XiDyH1 A relevant quote: After a bit of digging I found this is due to the small default open files limit in Mac Yosemite. I am positing how to increase the limit here in case anyone else has been running into the issue. Transient test failure for SocketServerTest - Socket closed. Key: KAFKA-2398 URL: https://issues.apache.org/jira/browse/KAFKA-2398 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin See the following transient test failure for SocketServerTest. kafka.network.SocketServerTest simpleRequest FAILED java.net.SocketException: Socket closed at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at kafka.network.SocketServerTest.connect(SocketServerTest.scala:84) at kafka.network.SocketServerTest.simpleRequest(SocketServerTest.scala:94) kafka.network.SocketServerTest tooBigRequestIsRejected FAILED java.net.SocketException: Socket closed at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at kafka.network.SocketServerTest.connect(SocketServerTest.scala:84) at kafka.network.SocketServerTest.tooBigRequestIsRejected(SocketServerTest.scala:124) kafka.network.SocketServerTest testSocketsCloseOnShutdown FAILED java.net.SocketException: Socket closed at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at kafka.network.SocketServerTest.connect(SocketServerTest.scala:84) at kafka.network.SocketServerTest.testSocketsCloseOnShutdown(SocketServerTest.scala:136) kafka.network.SocketServerTest testMaxConnectionsPerIp FAILED java.net.SocketException: Socket closed at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at kafka.network.SocketServerTest.connect(SocketServerTest.scala:84) at kafka.network.SocketServerTest$$anonfun$1.apply(SocketServerTest.scala:170) at kafka.network.SocketServerTest$$anonfun$1.apply(SocketServerTest.scala:170) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at
[jira] [Commented] (KAFKA-2388) subscribe(topic)/unsubscribe(topic) should either take a callback to allow user to handle exceptions or it should be synchronous.
[ https://issues.apache.org/jira/browse/KAFKA-2388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652058#comment-14652058 ] Jason Gustafson commented on KAFKA-2388: [~becket_qin] If you don't mind, I can take a crack at this. No worries if you were planning to do it. subscribe(topic)/unsubscribe(topic) should either take a callback to allow user to handle exceptions or it should be synchronous. - Key: KAFKA-2388 URL: https://issues.apache.org/jira/browse/KAFKA-2388 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Jiangjie Qin According to the mailing list discussion on the consumer interface, we'll replace: {code} public void subscribe(String... topics); public void subscribe(TopicPartition... partitions); public SetTopicPartition subscriptions(); {code} with: {code} void subscribe(ListString topics, RebalanceCallback callback); void assign(ListTopicPartition partitions); ListString subscriptions(); ListTopicPartition assignments(); {code} We don't need the unsubscribe APIs anymore. The RebalanceCallback would look like: {code} interface RebalanceCallback { void onAssignment(ListTopicPartition partitions); void onRevocation(ListTopicPartition partitions); // handle non-existing topics, etc. void onError(Exception e); } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1860) File system errors are not detected unless Kafka tries to write
[ https://issues.apache.org/jira/browse/KAFKA-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652067#comment-14652067 ] Mayuresh Gharat commented on KAFKA-1860: [~guozhang] ping. File system errors are not detected unless Kafka tries to write --- Key: KAFKA-1860 URL: https://issues.apache.org/jira/browse/KAFKA-1860 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Mayuresh Gharat Fix For: 0.9.0 Attachments: KAFKA-1860.patch When the disk (raid with caches dir) dies on a Kafka broker, typically the filesystem gets mounted into read-only mode, and hence when Kafka tries to read the disk, they'll get a FileNotFoundException with the read-only errno set (EROFS). However, as long as there is no produce request received, hence no writes attempted on the disks, Kafka will not exit on such FATAL error (when the disk starts working again, Kafka might think some files are gone while they will reappear later as raid comes back online). Instead it keeps spilling exceptions like: {code} 2015/01/07 09:47:41.543 ERROR [KafkaScheduler] [kafka-scheduler-1] [kafka-server] [] Uncaught exception in scheduled task 'kafka-recovery-point-checkpoint' java.io.FileNotFoundException: /export/content/kafka/i001_caches/recovery-point-offset-checkpoint.tmp (Read-only file system) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:206) at java.io.FileOutputStream.init(FileOutputStream.java:156) at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652678#comment-14652678 ] Rajasekar Elango commented on KAFKA-1690: - [~harsha_ch] Thanks for documentation, I tried to run this locally and here are my observations. 1. On kafka server.properties, I had to remove quotes for SSL properties to make it work. For eg when I had ssl.keystore.type = JKS; I got org.apache.kafka.common.KafkaException: java.security.KeyStoreException: JKS not found, when I changed to ssl.keystore.type = JKS , it worked, I had to do this for all ssl properties. Not sure if its just me, can you confirm if it works with quotes? 2. Console producer worked in secure mode, but I need to specify keystore location and password in addition to truststore, I guess documentation need to be updated. 3. Console consumer works in plaintext mode, not sure how to force SSL, I added --property security.protocol=SSL, It seem to be ignored, can you provide an example? I would suggest moving all SSL related configurations to separate ssl config file for broker and producer/consumer. Reason I ask is ssl properties contains secret information like passwords that need to be stored in secure location. If it's part of kafka server.properties we can't keep it in source control and we need keep whole kafka server.properties in secure location. So it's better to accept ssl.config.location as property in server.properties and read all ssl properties from there. The same applies to producer/consumer, producer/consumer.properties can be in source control while security properties can be pulled from secure location. It will also simplify running console-producer/console-consumer easily with one ssl.config.location option instead of bunch of ssl properties. new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: KAFKA-2386; increase timeouts for transient te...
Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/107 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-2386) Transient test failure: testGenerationIdIncrementsOnRebalance
[ https://issues.apache.org/jira/browse/KAFKA-2386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652696#comment-14652696 ] ASF GitHub Bot commented on KAFKA-2386: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/107 Transient test failure: testGenerationIdIncrementsOnRebalance - Key: KAFKA-2386 URL: https://issues.apache.org/jira/browse/KAFKA-2386 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Seen this in some builds. Might be caused by gc pause (or similar) which delays group join in the test. {code} kafka.coordinator.ConsumerCoordinatorResponseTest testGenerationIdIncrementsOnRebalance FAILED java.util.concurrent.TimeoutException: Futures timed out after [40 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at kafka.coordinator.ConsumerCoordinatorResponseTest.joinGroup(ConsumerCoordinatorResponseTest.scala:313) at kafka.coordinator.ConsumerCoordinatorResponseTest.testGenerationIdIncrementsOnRebalance(ConsumerCoordinatorResponseTest.scala:272) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2285) Logging trait obfuscates call site information
[ https://issues.apache.org/jira/browse/KAFKA-2285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grant Henke reassigned KAFKA-2285: -- Assignee: Grant Henke Logging trait obfuscates call site information -- Key: KAFKA-2285 URL: https://issues.apache.org/jira/browse/KAFKA-2285 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2.0 Reporter: E. Sammer Assignee: Grant Henke Using a logging trait, as many components in the codebase do, destroys call site information in logging message making debugging certain kinds of failures annoying in production systems. Most messages end up look like: {code} 2015-06-18 07:41:11,550 (kafka-request-handler-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] Partition [events,1] on broker 1: No checkpointed highwatermark is found for partition [events,1] {code} I think the mental overhead of issuing the standard incantation of {{private static final Logger logger = LoggerFactory.get(Foo.class)}} (or the even shorter Scala equivalent) for each class is outweighed by the operational overhead of mapping strings back to their original call sites. This is an easy win improve the traceability of complex failures in production deployments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2386) Transient test failure: testGenerationIdIncrementsOnRebalance
[ https://issues.apache.org/jira/browse/KAFKA-2386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652636#comment-14652636 ] ASF GitHub Bot commented on KAFKA-2386: --- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/107 KAFKA-2386; increase timeouts for transient test failure in ConsumerCoordinatorResponseTests There are two race conditions in the test case testGenerationIdIncrementsOnRebalance. First, a delay before the second join group request can timeout the initial group and cause the generationId to unexpectedly reset. Second, a delay in the join group request handling will timeout the request itself and cause the test to fail. This commit doesn't address these race conditions, but increases the timeouts to make them more unlikely. If the problem reoccurs, then we'll probably need a better solution. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-2386 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/107.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #107 commit a53460a1a6637a814cbfe3731431746e56c52742 Author: Jason Gustafson ja...@confluent.io Date: 2015-07-31T23:17:36Z KAFKA-2386; increase timeouts for transient test failure in ConsumerCoordinatorResponseTest Transient test failure: testGenerationIdIncrementsOnRebalance - Key: KAFKA-2386 URL: https://issues.apache.org/jira/browse/KAFKA-2386 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Seen this in some builds. Might be caused by gc pause (or similar) which delays group join in the test. {code} kafka.coordinator.ConsumerCoordinatorResponseTest testGenerationIdIncrementsOnRebalance FAILED java.util.concurrent.TimeoutException: Futures timed out after [40 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at kafka.coordinator.ConsumerCoordinatorResponseTest.joinGroup(ConsumerCoordinatorResponseTest.scala:313) at kafka.coordinator.ConsumerCoordinatorResponseTest.testGenerationIdIncrementsOnRebalance(ConsumerCoordinatorResponseTest.scala:272) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: KAFKA-2386; increase timeouts for transient te...
GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/107 KAFKA-2386; increase timeouts for transient test failure in ConsumerCoordinatorResponseTests There are two race conditions in the test case testGenerationIdIncrementsOnRebalance. First, a delay before the second join group request can timeout the initial group and cause the generationId to unexpectedly reset. Second, a delay in the join group request handling will timeout the request itself and cause the test to fail. This commit doesn't address these race conditions, but increases the timeouts to make them more unlikely. If the problem reoccurs, then we'll probably need a better solution. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-2386 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/107.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #107 commit a53460a1a6637a814cbfe3731431746e56c52742 Author: Jason Gustafson ja...@confluent.io Date: 2015-07-31T23:17:36Z KAFKA-2386; increase timeouts for transient test failure in ConsumerCoordinatorResponseTest --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652663#comment-14652663 ] Jay Kreps commented on KAFKA-2400: -- Also we should set good defaults: - session timeout should probably default to something pretty high, this will mean longer time to detect true failures but no false positives or churning, those who want faster detection can tune down appropriately (most won't care) - reasonable heartbeat frequency (300 ms?). Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2285) Logging trait obfuscates call site information
[ https://issues.apache.org/jira/browse/KAFKA-2285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652767#comment-14652767 ] Ismael Juma commented on KAFKA-2285: Also see https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-1351 Logging trait obfuscates call site information -- Key: KAFKA-2285 URL: https://issues.apache.org/jira/browse/KAFKA-2285 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2.0 Reporter: E. Sammer Assignee: Grant Henke Using a logging trait, as many components in the codebase do, destroys call site information in logging message making debugging certain kinds of failures annoying in production systems. Most messages end up look like: {code} 2015-06-18 07:41:11,550 (kafka-request-handler-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] Partition [events,1] on broker 1: No checkpointed highwatermark is found for partition [events,1] {code} I think the mental overhead of issuing the standard incantation of {{private static final Logger logger = LoggerFactory.get(Foo.class)}} (or the even shorter Scala equivalent) for each class is outweighed by the operational overhead of mapping strings back to their original call sites. This is an easy win improve the traceability of complex failures in production deployments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36722: Patch for KAFKA-2332
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36722/ --- (Updated Aug. 4, 2015, 1:24 a.m.) Review request for kafka. Bugs: KAFKA-2332 https://issues.apache.org/jira/browse/KAFKA-2332 Repository: kafka Description (updated) --- KAFKA-2332; Add quota metrics to old producer and consumer Please apply the patch after KAFKA-2136. Diffs - core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala 3df55e13001cef31ee662cc713b99baf8ab4ce0f core/src/main/scala/kafka/consumer/SimpleConsumer.scala 7ebc0405d1f309bed9943e7116051d1d8276f200 core/src/main/scala/kafka/producer/ProducerRequestStats.scala 026e93a2f1dcc0e6380fad509dc98b0cc6469a7a core/src/main/scala/kafka/producer/SyncProducer.scala dcee50113b1b1e062a56ab0f63ac6bb175be6b75 Diff: https://reviews.apache.org/r/36722/diff/ Testing --- Thanks, Dong Lin
[jira] [Commented] (KAFKA-2397) leave group request
[ https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652686#comment-14652686 ] Jay Kreps commented on KAFKA-2397: -- Nice summary [~onurkaraman]. I agree that adding a field to heartbeat is functionally equivalent to a leave_group request/resp. The reason for preferring that was just to reduce the conceptual weight of the protocol. A second idea that I'm not sure is good: rather than having either a new request or a heartbeat it would be possible to use the TCP connection closure for this. The advantage would be ANY process death that didn't also kill the OS would then be detectable without any client participation needed. The downside is that (1) the server change would be slightly more involved, and (2) you wouldn't be able to close the connection for other reasons. The complexity of implementation is that currently only the network layer knows about socket closes. However we were already introducing a session concept for the security work which allows the KakaApi layer to have access to cross-request state such as the authenticated identity. We could make it possible to add shutdown actions to the session that would make it possible to trigger this; or alternately we could add a way to add onSocketClose actions directly to the network layer. This same feature would actually be useful for the purgatory. Currently when a connection is closed, I don't think that requests in purgatory are removed. If the purgatory timeout is very small this is okay, but a very common thing for people to ask for NO timeout in which case each connection close potentially leaks memory. I think we kind of fixed this by just overriding the max wait time but purging purgatory on shutdown is obviously preferable. leave group request --- Key: KAFKA-2397 URL: https://issues.apache.org/jira/browse/KAFKA-2397 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Onur Karaman Assignee: Onur Karaman Priority: Minor Fix For: 0.8.3 Let's say every consumer in a group has session timeout s. Currently, if a consumer leaves the group, the worst case time to stabilize the group is 2s (s to detect the consumer failure + s for the rebalance window). If a consumer instead can declare they are leaving the group, the worst case time to stabilize the group would just be the s associated with the rebalance window. This is a low priority optimization! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2397) leave group request
[ https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652686#comment-14652686 ] Jay Kreps edited comment on KAFKA-2397 at 8/3/15 10:36 PM: --- Nice summary [~onurkaraman]. I agree that adding a field to heartbeat is functionally equivalent to a leave_group request/resp. The reason for preferring enriching the heartbeat that was just to reduce the conceptual weight of the protocol (each new request/response pair kind of adds to the list of things you have to understand...whereas an optional parameter can kind of be ignored). This is more esthetic than anything else. A second idea that I'm not sure is good: rather than having either a new request or a heartbeat it would be possible to use the TCP connection closure for this. The advantage would be ANY process death that didn't also kill the OS would then be detectable without any client participation needed. The downside is that (1) the server change would be slightly more involved, and (2) you wouldn't be able to close the connection for other reasons. The complexity of implementation is that currently only the network layer knows about socket closes. However we were already introducing a session concept for the security work which allows the KakaApi layer to have access to cross-request state such as the authenticated identity. We could make it possible to add shutdown actions to the session that would make it possible to trigger this; or alternately we could add a way to add onSocketClose actions directly to the network layer. This same feature would actually be useful for the purgatory. Currently when a connection is closed, I don't think that requests in purgatory are removed. If the purgatory timeout is very small this is okay, but a very common thing for people to ask for NO timeout in which case each connection close potentially leaks memory. I think we kind of fixed this by just overriding the max wait time but purging purgatory on shutdown is obviously preferable. was (Author: jkreps): Nice summary [~onurkaraman]. I agree that adding a field to heartbeat is functionally equivalent to a leave_group request/resp. The reason for preferring that was just to reduce the conceptual weight of the protocol. A second idea that I'm not sure is good: rather than having either a new request or a heartbeat it would be possible to use the TCP connection closure for this. The advantage would be ANY process death that didn't also kill the OS would then be detectable without any client participation needed. The downside is that (1) the server change would be slightly more involved, and (2) you wouldn't be able to close the connection for other reasons. The complexity of implementation is that currently only the network layer knows about socket closes. However we were already introducing a session concept for the security work which allows the KakaApi layer to have access to cross-request state such as the authenticated identity. We could make it possible to add shutdown actions to the session that would make it possible to trigger this; or alternately we could add a way to add onSocketClose actions directly to the network layer. This same feature would actually be useful for the purgatory. Currently when a connection is closed, I don't think that requests in purgatory are removed. If the purgatory timeout is very small this is okay, but a very common thing for people to ask for NO timeout in which case each connection close potentially leaks memory. I think we kind of fixed this by just overriding the max wait time but purging purgatory on shutdown is obviously preferable. leave group request --- Key: KAFKA-2397 URL: https://issues.apache.org/jira/browse/KAFKA-2397 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Onur Karaman Assignee: Onur Karaman Priority: Minor Fix For: 0.8.3 Let's say every consumer in a group has session timeout s. Currently, if a consumer leaves the group, the worst case time to stabilize the group is 2s (s to detect the consumer failure + s for the rebalance window). If a consumer instead can declare they are leaving the group, the worst case time to stabilize the group would just be the s associated with the rebalance window. This is a low priority optimization! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2386) Transient test failure: testGenerationIdIncrementsOnRebalance
[ https://issues.apache.org/jira/browse/KAFKA-2386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira resolved KAFKA-2386. - Resolution: Fixed Assignee: Jason Gustafson Reviewer: Gwen Shapira Fix Version/s: 0.8.3 Transient test failure: testGenerationIdIncrementsOnRebalance - Key: KAFKA-2386 URL: https://issues.apache.org/jira/browse/KAFKA-2386 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 0.8.3 Seen this in some builds. Might be caused by gc pause (or similar) which delays group join in the test. {code} kafka.coordinator.ConsumerCoordinatorResponseTest testGenerationIdIncrementsOnRebalance FAILED java.util.concurrent.TimeoutException: Futures timed out after [40 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at kafka.coordinator.ConsumerCoordinatorResponseTest.joinGroup(ConsumerCoordinatorResponseTest.scala:313) at kafka.coordinator.ConsumerCoordinatorResponseTest.testGenerationIdIncrementsOnRebalance(ConsumerCoordinatorResponseTest.scala:272) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652805#comment-14652805 ] Jun Rao commented on KAFKA-1690: [~erajasekar], for 3, the SSL patch only works for the new consumer. We haven't ported ConsoleConsumer to use the new consumer yet. This is being tracked in KAFKA-2015. As for the config file, on the broker side, it may not be appropriate to have an ssl.config.location property pointing to an SSL property file. People can integrate Kafka with different config systems (through KafkaServerStartable) and not all config systems store properties in files. new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Gauging Interest in adding Encryption to Kafka
The client-broker protocol would have to be augmented to carry the encrypted encryption key, plus logic to handle redistribution to existing clients due to key rotation. This is a good point. HDFS has the encryption zone concept, which could be akin to a topic. The keys in HDFS are per file level, not sure what would be good compromise here for the granularity. For simplicity, it could be at the topic level itself, but the master key is never given to the client. Regardless, internal Kafka management of messages, batching, replication, compression, compaction, performance, etc might be some of the key deciding factors. On Mon, Aug 3, 2015 at 10:22 AM, Gwen Shapira g...@confluent.io wrote: If I understand you correctly, you are saying that the kerberos keytab that the broker uses to authenticate with the KMS will be somewhere on the broker node and can be used by a malicious admin. Yes. If broker is doing encryption/decryption, then you need to bootstrap the broker(s) with the encryption key. The key could be in local disk as java keystore or remote in KMS. If it is in remote KMS, then you will need to authenticate with KMS via kerberos or other authentication scheme. Regardless, the Kafka admin having shell login access as the linux user used by Kafka broker process (assuming kafka) will have access to the key and would be able to encrypt/decrypt the stored data. This might seem not like a big deal, but in some enterprises ³separation of duties² or ³privilege user access management (PIM)² are critical compliance requirements. More importantly, if the goal is just to store data in encrypted form in the disk, then honestly, you just need to encrypt the Kafka data volume using LUKS and with restricted file permissions. This will take care of issues like disk being stolen, etc. You don¹t need to do any changes to Kafka :-) Thanks Bosco On 8/3/15, 12:28 PM, Alejandro Abdelnur tuc...@gmail.com wrote: Doing encryption on the client has the following benefits (most of them already mentioned in the thread): * brokers don't have additional CPU load * brokers never see the data in unencrypted form (Kafka admins cannot snoop) * secure multi-tenancy (keys are 100% on the client space) * no need to secure Kafka wire transport, client-broker and broker-broker (data is already encrypted) It would be highly desirable, even if encryption is done on the client side, that encryption is 'transparent'. Similar to how HDFS encryption works, it is not the client writing/reading a topic the one that decides to encrypt/decrypt but the broker is the one telling that to the client and providing encrypted encryption keys for the tasks.The client-broker protocol would have to be augmented to carry the encrypted encryption key, plus logic to handle redistribution to existing clients due to key rotation. A nice thing about doing broker side encryption though is that you can shut off clients at any time and they won't see unencrypted data anymore. But this means the brokers will have to deal with the client ACLs for encryption (i'd rather leave that outside of Kafka and being a concern of the KMS system). You could achieve similar functionality on client side encryption, by removing the client from the ACLs in the KMS and doing a key rotation, then the client will not be able to decrypt any messages using the new key (though all previous ones using the key that the client already has will be visible to the client). Thanks. On Mon, Aug 3, 2015 at 10:22 AM, Gwen Shapira g...@confluent.io wrote: If I understand you correctly, you are saying that the kerberos keytab that the broker uses to authenticate with the KMS will be somewhere on the broker node and can be used by a malicious admin. I agree this is a valid concern. I am not opposed to client-side encryption, I am more concerned that the modifications this will require in Kafka broker implementation make the idea impractical. And obviously, as in any security discussion - there are lots of details regarding key exchange, management and protection that are critical. Perhaps given a design doc, we can better evaluate the proposed tradeoffs. Gwen On Sat, Aug 1, 2015 at 10:10 AM, Don Bosco Durai bo...@apache.org wrote: Any reason you think its better to let the clients handle it? Gwen, I agree with Todd, depending on the goal, the requirements might vary. If the goal is that someone stills the disk, then they should be able to access the data, then encrypting at Broker is enough. However, if the requirement is that the admin/operator should not be able to access the data, then client side is the only option. Hadoop/HDFS transparent data encryption has a similar philosophy, where the actual encryption/decryption happens at the client side. 1. Key management Hadoop common has a KMS. And there are industry standards like KMIP. If Broker does the encrypt/decrypt, then the solution is much easier. If the client does it, then
[jira] [Commented] (KAFKA-2285) Logging trait obfuscates call site information
[ https://issues.apache.org/jira/browse/KAFKA-2285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652841#comment-14652841 ] Ismael Juma commented on KAFKA-2285: Better than a macro would be using @inline, but sadly that doesn't work for trait methods: https://issues.scala-lang.org/browse/SI-4767 We could have a logger class instead, but that's a bigger change. Logging trait obfuscates call site information -- Key: KAFKA-2285 URL: https://issues.apache.org/jira/browse/KAFKA-2285 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2.0 Reporter: E. Sammer Assignee: Grant Henke Using a logging trait, as many components in the codebase do, destroys call site information in logging message making debugging certain kinds of failures annoying in production systems. Most messages end up look like: {code} 2015-06-18 07:41:11,550 (kafka-request-handler-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] Partition [events,1] on broker 1: No checkpointed highwatermark is found for partition [events,1] {code} I think the mental overhead of issuing the standard incantation of {{private static final Logger logger = LoggerFactory.get(Foo.class)}} (or the even shorter Scala equivalent) for each class is outweighed by the operational overhead of mapping strings back to their original call sites. This is an easy win improve the traceability of complex failures in production deployments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/#review93965 --- Looks good overall. I have a few more minor comments/suggestions. Apart from that the patch needs a rebase. core/src/main/scala/kafka/server/ClientQuotaMetrics.scala (line 74) https://reviews.apache.org/r/33049/#comment148446 Would ClientQuotaManager be a better name? core/src/main/scala/kafka/server/ClientQuotaMetrics.scala (line 98) https://reviews.apache.org/r/33049/#comment148447 A number of places in this patch use a : b instead of a: b. Highly stylistic, I'm beginning to not care about it, but thought we generally prefer a: b core/src/main/scala/kafka/server/KafkaApis.scala (line 643) https://reviews.apache.org/r/33049/#comment148417 Rather than string - quota manager should we just do short (for requestkey) - quota manager and avoid additional `nameForKey` lookups further above? core/src/main/scala/kafka/server/KafkaApis.scala (line 652) https://reviews.apache.org/r/33049/#comment148418 ``` quotaManagers.foreach { case(apiKey, quotaManager) = quotaManager.shutdown() } ``` core/src/main/scala/kafka/server/ThrottledRequest.scala (line 31) https://reviews.apache.org/r/33049/#comment148422 Would ThrottledResponse be a more accurate name for this? core/src/test/scala/integration/kafka/api/QuotasTest.scala (line 133) https://reviews.apache.org/r/33049/#comment148423 Can we look up the sensor directly? E.g., `...metrics().getSensor(RequestKeys.nameForKey(RequestKeys.FetchKey) + ThrottleTime- + consumerId1)` core/src/test/scala/integration/kafka/api/QuotasTest.scala (line 140) https://reviews.apache.org/r/33049/#comment148425 and with the above this would just be `assertNotNull` core/src/test/scala/integration/kafka/api/QuotasTest.scala (line 148) https://reviews.apache.org/r/33049/#comment148424 Uncomment? core/src/test/scala/integration/kafka/api/QuotasTest.scala (line 166) https://reviews.apache.org/r/33049/#comment148426 Similar comments as above. - Joel Koshy On June 30, 2015, 12:54 a.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated June 30, 2015, 12:54 a.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description --- Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases for both producer and consumer 6. This doesn't include a system test. There is a separate ticket for that 7. Fixed KAFKA-2191 - (Included fix from : https://reviews.apache.org/r/34418/ ) Addressing Joel's comments Diffs - clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java 98429da34418f7f1deba1b5e44e2e6025212edb3 clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 544e120594de78c43581a980b1e4087b4fb98ccb clients/src/test/java/org/apache/kafka/common/utils/MockTime.java core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/KafkaConfig.scala 2d75186a110075e0c322db4b9f7a8c964a7a3e88 core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION core/src/main/scala/kafka/utils/ShutdownableThread.scala fc226c863095b7761290292cd8755cd7ad0f155c core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
Re: Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/#review93979 --- This patch also needs a rebase. core/src/main/scala/kafka/server/ReplicaManager.scala (line 312) https://reviews.apache.org/r/33378/#comment148455 I'm a bit unclear on how you are planning to put in the right delay value in the response struct. i.e., in KAFKA-2084 you are computing the delay inside the callback. How will that value be accessed here? - Joel Koshy On July 13, 2015, 8:36 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated July 13, 2015, 8:36 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description --- Changes are - Addressing Joel's comments - protocol changes to the fetch request and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases. - Addressed Joel's comments For now the patch will publish a zero delay and return a response Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala 5b38f8554898e54800abd65a7415dd0ac41fd958 core/src/main/scala/kafka/api/FetchResponse.scala 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 core/src/main/scala/kafka/api/ProducerRequest.scala c866180d3680da03e48d374415f10220f6ca68c4 core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- New tests added Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2231) Deleting a topic fails
[ https://issues.apache.org/jira/browse/KAFKA-2231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652909#comment-14652909 ] Laurent Raufaste commented on KAFKA-2231: - FYI I have the same problem. On a pristine 20 brokers cluster on EC2, I created a topic with a RF of 3 and 100 partitions, did nothing on it, deleted it. {quote} $ ./bin/kafka-topics.sh --zookeeper zookeeper/kafka --list lolo - marked for deletion {quote} It's now marked for deletion for 2h. No errors anywhere. I have {{delete.topic.enable=true}} on all my brokers, and we run 0.8.2.1. I got some log activity about the deletion on one broker: {quote} Aug 03 20:00:21 broker-15.kafka kafka: [2015-08-04 00:00:19,303] INFO Deleting index /percolate/kafka/data/lolo-49/.index (kafka.log.OffsetIndex) Aug 03 20:00:21 broker-15.kafka kafka: [2015-08-04 00:00:19,304] INFO Deleted log for partition [lolo,49] in /percolate/kafka/data/lolo-49. (kafka.log.LogManager) Aug 03 20:00:21 broker-15.kafka kafka: [2015-08-04 00:00:19,338] INFO [ReplicaFetcherManager on broker 15] Removed fetcher for partitions [lolo,91] (kafka.server.ReplicaFetcherManager) Aug 03 20:00:21 broker-15.kafka kafka: [2015-08-04 00:00:19,338] INFO Deleting index /percolate/kafka/data/lolo-91/.index (kafka.log.OffsetIndex) Aug 03 20:00:21 broker-15.kafka kafka: [2015-08-04 00:00:19,338] INFO Deleted log for partition [lolo,91] in /percolate/kafka/data/lolo-91. (kafka.log.LogManager) {quote} And finally here's the description of the topic: {quote} $ ./bin/kafka-topics.sh --zookeeper zookeeper/kafka --describe --topic lolo Topic:lolo PartitionCount:100 ReplicationFactor:3 Configs: Topic: lolo Partition: 0Leader: -1 Replicas: 6,19,20 Isr: Topic: lolo Partition: 1Leader: -1 Replicas: 7,20,1 Isr: Topic: lolo Partition: 2Leader: -1 Replicas: 8,1,2 Isr: Topic: lolo Partition: 3Leader: -1 Replicas: 9,2,3 Isr: Topic: lolo Partition: 4Leader: -1 Replicas: 10,3,4 Isr: Topic: lolo Partition: 5Leader: -1 Replicas: 11,4,5 Isr: Topic: lolo Partition: 6Leader: -1 Replicas: 12,5,6 Isr: Topic: lolo Partition: 7Leader: -1 Replicas: 13,6,7 Isr: Topic: lolo Partition: 8Leader: -1 Replicas: 14,7,8 Isr: Topic: lolo Partition: 9Leader: -1 Replicas: 15,8,9 Isr: Topic: lolo Partition: 10 Leader: -1 Replicas: 16,9,10 Isr: Topic: lolo Partition: 11 Leader: -1 Replicas: 17,10,11 Isr: Topic: lolo Partition: 12 Leader: -1 Replicas: 18,11,12 Isr: Topic: lolo Partition: 13 Leader: -1 Replicas: 19,12,13 Isr: Topic: lolo Partition: 14 Leader: -1 Replicas: 20,13,14 Isr: Topic: lolo Partition: 15 Leader: -1 Replicas: 1,14,15 Isr: Topic: lolo Partition: 16 Leader: -1 Replicas: 2,15,16 Isr: Topic: lolo Partition: 17 Leader: -1 Replicas: 3,16,17 Isr: Topic: lolo Partition: 18 Leader: -1 Replicas: 4,17,18 Isr: Topic: lolo Partition: 19 Leader: -1 Replicas: 5,18,19 Isr: Topic: lolo Partition: 20 Leader: -1 Replicas: 6,20,1 Isr: Topic: lolo Partition: 21 Leader: -1 Replicas: 7,1,2 Isr: Topic: lolo Partition: 22 Leader: -1 Replicas: 8,2,3 Isr: Topic: lolo Partition: 23 Leader: -1 Replicas: 9,3,4 Isr: Topic: lolo Partition: 24 Leader: -1 Replicas: 10,4,5 Isr: Topic: lolo Partition: 25 Leader: -1 Replicas: 11,5,6 Isr: Topic: lolo Partition: 26 Leader: -1 Replicas: 12,6,7 Isr: Topic: lolo Partition: 27 Leader: -1 Replicas: 13,7,8 Isr: Topic: lolo Partition: 28 Leader: -1 Replicas: 14,8,9 Isr: Topic: lolo Partition: 29 Leader: -1 Replicas: 15,9,10 Isr: Topic: lolo Partition: 30 Leader: -1 Replicas: 16,10,11 Isr: Topic: lolo Partition: 31 Leader: -1 Replicas: 17,11,12 Isr: Topic: lolo Partition: 32 Leader: -1 Replicas: 18,12,13 Isr: Topic: lolo Partition: 33 Leader: -1 Replicas: 19,13,14 Isr: Topic: lolo Partition: 34 Leader: -1 Replicas: 20,14,15 Isr: Topic: lolo Partition: 35 Leader: -1 Replicas: 1,15,16 Isr: Topic: lolo Partition: 36 Leader: -1 Replicas: 2,16,17 Isr: Topic: lolo
Re: Review Request 36722: Patch for KAFKA-2332
On Aug. 4, 2015, 12:36 a.m., Joel Koshy wrote: core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala, line 25 https://reviews.apache.org/r/36722/diff/1/?file=1019435#file1019435line25 unused import Thanks for the catch. - Dong --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36722/#review93833 --- On Aug. 4, 2015, 1:23 a.m., Dong Lin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36722/ --- (Updated Aug. 4, 2015, 1:23 a.m.) Review request for kafka. Bugs: KAFKA-2332 https://issues.apache.org/jira/browse/KAFKA-2332 Repository: kafka Description --- KAFKA-2332; Add quota metrics to old producer and consumer Diffs - core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala 3df55e13001cef31ee662cc713b99baf8ab4ce0f core/src/main/scala/kafka/consumer/SimpleConsumer.scala 7ebc0405d1f309bed9943e7116051d1d8276f200 core/src/main/scala/kafka/producer/ProducerRequestStats.scala 026e93a2f1dcc0e6380fad509dc98b0cc6469a7a core/src/main/scala/kafka/producer/SyncProducer.scala dcee50113b1b1e062a56ab0f63ac6bb175be6b75 Diff: https://reviews.apache.org/r/36722/diff/ Testing --- Thanks, Dong Lin
Re: Review Request 36722: Patch for KAFKA-2332
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36722/ --- (Updated Aug. 4, 2015, 1:23 a.m.) Review request for kafka. Bugs: KAFKA-2332 https://issues.apache.org/jira/browse/KAFKA-2332 Repository: kafka Description --- KAFKA-2332; Add quota metrics to old producer and consumer Diffs (updated) - core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala 3df55e13001cef31ee662cc713b99baf8ab4ce0f core/src/main/scala/kafka/consumer/SimpleConsumer.scala 7ebc0405d1f309bed9943e7116051d1d8276f200 core/src/main/scala/kafka/producer/ProducerRequestStats.scala 026e93a2f1dcc0e6380fad509dc98b0cc6469a7a core/src/main/scala/kafka/producer/SyncProducer.scala dcee50113b1b1e062a56ab0f63ac6bb175be6b75 Diff: https://reviews.apache.org/r/36722/diff/ Testing --- Thanks, Dong Lin
[jira] [Updated] (KAFKA-2332) Add quota metrics to old producer and consumer
[ https://issues.apache.org/jira/browse/KAFKA-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-2332: Attachment: KAFKA-2332_2015-08-03_18:22:53.patch Add quota metrics to old producer and consumer -- Key: KAFKA-2332 URL: https://issues.apache.org/jira/browse/KAFKA-2332 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Dong Lin Labels: quotas Attachments: KAFKA-2332.patch, KAFKA-2332.patch, KAFKA-2332_2015-08-03_18:22:53.patch Quota metrics have only been added to the new producer and consumer. It may be beneficial to add these to the existing consumer and old producer also for clients using the older versions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2332) Add quota metrics to old producer and consumer
[ https://issues.apache.org/jira/browse/KAFKA-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652915#comment-14652915 ] Dong Lin commented on KAFKA-2332: - Updated reviewboard https://reviews.apache.org/r/36722/diff/ against branch origin/trunk Add quota metrics to old producer and consumer -- Key: KAFKA-2332 URL: https://issues.apache.org/jira/browse/KAFKA-2332 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Dong Lin Labels: quotas Attachments: KAFKA-2332.patch, KAFKA-2332.patch, KAFKA-2332_2015-08-03_18:22:53.patch Quota metrics have only been added to the new producer and consumer. It may be beneficial to add these to the existing consumer and old producer also for clients using the older versions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/#review93991 --- clients/src/main/java/org/apache/kafka/common/metrics/Quota.java (line 65) https://reviews.apache.org/r/33049/#comment148477 It's considered a best practice in Java to use ``instanceof`` instead of ``getClass()`` as explained here http://stackoverflow.com/a/596507/2698109 So, you could rewrite lines 65 to 74 as: `` if (!(obj instanceof Quota)) return false; Quota that = (Quota) obj; return (that.bound == this.bound) (this.upper == this.upper); `` If you decide to keep the ``getClass()`` if-condition as is today, lines 70 to 74 can be simplified as above. clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java (line 27) https://reviews.apache.org/r/33049/#comment148478 It's a very good pratice to make any field ``final`` unless necessary otherwise. Make this field final. clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java (line 138) https://reviews.apache.org/r/33049/#comment148488 If time is, for example, 3.8 this will return 3. Wouldn't be better to round it to 4 using something akin ``(int) Math.round(time)``? core/src/main/scala/kafka/server/ClientQuotaMetrics.scala (line 99) https://reviews.apache.org/r/33049/#comment148484 separate ``if`` and ``(`` with a space. core/src/main/scala/kafka/server/ClientQuotaMetrics.scala (line 174) https://reviews.apache.org/r/33049/#comment148482 space between ``if`` and ``(`` core/src/main/scala/kafka/server/ClientQuotaMetrics.scala (line 184) https://reviews.apache.org/r/33049/#comment148483 space between ``if`` and ``(`` core/src/main/scala/kafka/server/KafkaServer.scala (line 362) https://reviews.apache.org/r/33049/#comment148489 insert a space between ``if`` and ``(``. core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala (line 68) https://reviews.apache.org/r/33049/#comment148487 separate ``for`` and ``(`` with a space. core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala (line 58) https://reviews.apache.org/r/33049/#comment148486 separate ``for`` and ``(`` with a space. core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala (line 83) https://reviews.apache.org/r/33049/#comment148485 separate ``for`` and ``(`` with a space. - Edward Ribeiro On June 30, 2015, 12:54 a.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated June 30, 2015, 12:54 a.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description --- Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases for both producer and consumer 6. This doesn't include a system test. There is a separate ticket for that 7. Fixed KAFKA-2191 - (Included fix from : https://reviews.apache.org/r/34418/ ) Addressing Joel's comments Diffs - clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java 98429da34418f7f1deba1b5e44e2e6025212edb3 clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 544e120594de78c43581a980b1e4087b4fb98ccb clients/src/test/java/org/apache/kafka/common/utils/MockTime.java core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/KafkaConfig.scala 2d75186a110075e0c322db4b9f7a8c964a7a3e88 core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION
[jira] [Created] (KAFKA-2401) Fix transient failure of ProducerSendTest.testCloseWithZeroTimeoutFromSenderThread()
Jiangjie Qin created KAFKA-2401: --- Summary: Fix transient failure of ProducerSendTest.testCloseWithZeroTimeoutFromSenderThread() Key: KAFKA-2401 URL: https://issues.apache.org/jira/browse/KAFKA-2401 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Jiangjie Qin The transient failure can happen because of a race condition of the callback firing order for messages produced to broker 0 and broker 1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: KAFKA-2402: Create IsrChangeNotificationPath w...
GitHub user becketqin opened a pull request: https://github.com/apache/kafka/pull/108 KAFKA-2402: Create IsrChangeNotificationPath when server statrs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/becketqin/kafka KAFKA-2402 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/108.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #108 commit 37e423066c5ff8a695a8fcde4f0c2e51832aa6b5 Author: Jiangjie Qin becket@gmail.com Date: 2015-08-04T05:36:20Z KAFKA-2402: Create IsrChangeNotificationPath when start the server. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-2402) Broker should create zkpath /isr_change_notification if it does not exist when updating ISR.
[ https://issues.apache.org/jira/browse/KAFKA-2402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14653095#comment-14653095 ] ASF GitHub Bot commented on KAFKA-2402: --- GitHub user becketqin opened a pull request: https://github.com/apache/kafka/pull/108 KAFKA-2402: Create IsrChangeNotificationPath when server statrs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/becketqin/kafka KAFKA-2402 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/108.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #108 commit 37e423066c5ff8a695a8fcde4f0c2e51832aa6b5 Author: Jiangjie Qin becket@gmail.com Date: 2015-08-04T05:36:20Z KAFKA-2402: Create IsrChangeNotificationPath when start the server. Broker should create zkpath /isr_change_notification if it does not exist when updating ISR. Key: KAFKA-2402 URL: https://issues.apache.org/jira/browse/KAFKA-2402 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin This is a follow up patch for KAFKA-1367. When broker update ISR of partitions, it should ensure zkPath /isr_change_notification exist. This does not matter when users do a clean deploy of Kafka cluster because controller will always create the cluster. But it matters when users are doing a rolling upgrade since the controller could still be running on a old version broker. In that case, ZkNoNodeException will be thrown and replica fetching will fail. We can either document the upgrade process to ask user create the zk path manually before upgrade or preferably we can handle it in the code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2402) Broker should create zkpath /isr_change_notification if it does not exist when updating ISR.
Jiangjie Qin created KAFKA-2402: --- Summary: Broker should create zkpath /isr_change_notification if it does not exist when updating ISR. Key: KAFKA-2402 URL: https://issues.apache.org/jira/browse/KAFKA-2402 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin This is a follow up patch for KAFKA-1367. When broker update ISR of partitions, it should ensure zkPath /isr_change_notification exist. This does not matter when users do a clean deploy of Kafka cluster because controller will always create the cluster. But it matters when users are doing a rolling upgrade since the controller could still be running on a old version broker. In that case, ZkNoNodeException will be thrown and replica fetching will fail. We can either document the upgrade process to ask user create the zk path manually before upgrade or preferably we can handle it in the code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-27 - Conditional Publish
A couple of thoughts on the commit log use case. Suppose that we want to maintain multiple replicas of a K/V store backed by a shared Kafka topic/partition as a commit log. There are two possible ways to use Kafka as a commit log. 1. The first approach allows multiple producers to publish to Kafka. Each replica of the data store keeps reading from a Kafka topic/partition to refresh the replica's view. Every time a replica gets an update to a key from a client, it combines the update and the current value of the key in its view and generates a post-value. It then does a conditional publish to Kafka with the post-value. The update is successful if the conditional publish succeeds. Otherwise, the replica has to recompute the post-value (potentially after the replica's view is refreshed) and retry the conditional publish. A potential issue with this approach is when there is a transient failure during publishing to Kafka (e.g., the leader of the partition changes). When this happens, the conditional publish will get an error. The replica doesn't know whether the publish actually succeeded or not. If we just blindly retry, it may not give the correct behavior (e.g., we could be applying +1 twice). So, not sure if conditional publish itself is enough for this approach. 2. The second approach allows only a single producer to publish to Kafka. We somehow elect one of the replicas to be the master that handles all updates. Normally, we don't need conditional publish since there is a single producer. Conditional publish can potentially be used to deal with duplicates. If the master encounters the same transient failure as the above, it can get the latest offset from the Kafka topic/partition to see if the publish actually succeeded or not since it's the only producer. A potential issue here is to handle the zombie master problem: if the master has a soft failure and another master is elected, we need to prevent the old master from publishing new data to Kafka. So, for this approach to work properly, we need some kind of support of single writer in addition to conditional publish. Jiangjie, The issue with partial commit is the following. Say we have a batch of 10 uncompressed messages sent to the leader. The followers only fetched the first 5 messages and then the leader dies. In this case, we only committed 5 out of the 10 messages. Thanks, Jun On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck daniel.schierb...@gmail.com wrote: Jiangjie: I think giving users the possibility of defining a custom policy for handling rejections is a good idea. For instance, this will allow Kafka to act as an event store in an Event Sourcing application. If the event(s) are rejected by the store, the original command may need to be re-validated against the new state. On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid wrote: @Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each partition. 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER The major problem it tries to solve is exact once produce, i.e. solve the duplicates from producer side. In that case, a batch will be considered as atomic. The only possibility of a batch got rejected should be it is already appended. So the producer should just move on. It looks to me even a transient multiple producer scenario will cause issue because user need to think about what should do if a request got rejected and the answer varies for different use cases. Thanks, Jiangjie (Becket) Qin On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote: So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they compare; if I've made any mistakes, I'd be delighted if someone with more context on the idempotent producer design would correct me. As a first intuition, you can think of the 'conditional publish' proposal as the special case of the 'idempotent producer' idea, where there's just a single producer per-partition. The key observation here is: if there's only one producer, you can conflate the 'sequence number' and the expected offset. The conditional publish proposal uses existing Kafka offset APIs for roughly the same things as the idempotent producer proposal uses sequence numbers for -- eg. instead of having a lease PID API that returns the current sequence number, we can use the existing 'offset API' to retrieve the upcoming offset. Both proposals attempt to deal with the situation where there are transiently multiple publishers for the same partition (and PID). The idempotent producer setup tracks a generation id for each pid, and discards any writes with a generation id smaller than the latest value. Conditional publish is
[jira] [Created] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
Jason Gustafson created KAFKA-2400: -- Summary: Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-873) Consider replacing zkclient with curator (with zkclient-bridge)
[ https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14651721#comment-14651721 ] Flavio Junqueira commented on KAFKA-873: +1 to have this one in in the next release. Consider replacing zkclient with curator (with zkclient-bridge) --- Key: KAFKA-873 URL: https://issues.apache.org/jira/browse/KAFKA-873 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.0 Reporter: Scott Clasen Assignee: Grant Henke Fix For: 0.8.3 If zkclient was replaced with curator and curator-x-zkclient-bridge it would be initially a drop-in replacement https://github.com/Netflix/curator/wiki/ZKClient-Bridge With the addition of a few more props to ZkConfig, and a bit of code this would open up the possibility of using ACLs in zookeeper (which arent supported directly by zkclient), as well as integrating with netflix exhibitor for those of us using that. Looks like KafkaZookeeperClient needs some love anyhow... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-873) Consider replacing zkclient with curator (with zkclient-bridge)
[ https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-873: --- Fix Version/s: 0.8.3 Consider replacing zkclient with curator (with zkclient-bridge) --- Key: KAFKA-873 URL: https://issues.apache.org/jira/browse/KAFKA-873 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.0 Reporter: Scott Clasen Assignee: Grant Henke Fix For: 0.8.3 If zkclient was replaced with curator and curator-x-zkclient-bridge it would be initially a drop-in replacement https://github.com/Netflix/curator/wiki/ZKClient-Bridge With the addition of a few more props to ZkConfig, and a bit of code this would open up the possibility of using ACLs in zookeeper (which arent supported directly by zkclient), as well as integrating with netflix exhibitor for those of us using that. Looks like KafkaZookeeperClient needs some love anyhow... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2384) Override commit message title in kafka-merge-pr.py
[ https://issues.apache.org/jira/browse/KAFKA-2384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14651951#comment-14651951 ] ASF GitHub Bot commented on KAFKA-2384: --- GitHub user ijuma opened a pull request: https://github.com/apache/kafka/pull/105 KAFKA-2384; Override commit message title in kafka-merge-pr.py You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijuma/kafka kafka-2384-override-commit-message-title Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/105.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #105 commit e04224273420ce4286a7f36356f99ce0df1af890 Author: Ismael Juma ism...@juma.me.uk Date: 2015-08-03T14:23:05Z Support overriding of commit message title in kafka-merge-pr.py Override commit message title in kafka-merge-pr.py -- Key: KAFKA-2384 URL: https://issues.apache.org/jira/browse/KAFKA-2384 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Ismael Juma Fix For: 0.8.3 It would be more convenient allow setting the commit message in the merging script; right now the script takes the PR title as is and the contributors have to change them according to the submission-review guidelines before doing the merge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: KAFKA-2384; Override commit message title in k...
GitHub user ijuma opened a pull request: https://github.com/apache/kafka/pull/105 KAFKA-2384; Override commit message title in kafka-merge-pr.py You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijuma/kafka kafka-2384-override-commit-message-title Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/105.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #105 commit e04224273420ce4286a7f36356f99ce0df1af890 Author: Ismael Juma ism...@juma.me.uk Date: 2015-08-03T14:23:05Z Support overriding of commit message title in kafka-merge-pr.py --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (KAFKA-2384) Override commit message title in kafka-merge-pr.py
[ https://issues.apache.org/jira/browse/KAFKA-2384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2384: --- Reviewer: Guozhang Wang Status: Patch Available (was: In Progress) [~guozhang], I submitted a PR with the requested change. Can you please take a look and see if this matches what you were expecting? A good way to test it is to checkout this branch and then use it to merge this PR (i.e. #105). Override commit message title in kafka-merge-pr.py -- Key: KAFKA-2384 URL: https://issues.apache.org/jira/browse/KAFKA-2384 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Ismael Juma Fix For: 0.8.3 It would be more convenient allow setting the commit message in the merging script; right now the script takes the PR title as is and the contributors have to change them according to the submission-review guidelines before doing the merge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652197#comment-14652197 ] Chris Barlock commented on KAFKA-1716: -- We just upgraded to Kafka 0.8.2.1 and are now hitting this issue every single time we shut down our environment. Is anyone looking at a fix? hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede Attachments: after-shutdown.log, before-shutdown.log, kafka-shutdown-stuck.log It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
[jira] [Commented] (KAFKA-2397) leave group request
[ https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652231#comment-14652231 ] Jiangjie Qin commented on KAFKA-2397: - I would prefer extending heartbeat to indicate leaving group. And there will always a be a delay for up to 1/3 of session timeout for the rebalance to be triggered on all the consumers in the group given the broker always trigger rebalance on heartbeat response. That is probably fine. leave group request --- Key: KAFKA-2397 URL: https://issues.apache.org/jira/browse/KAFKA-2397 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Onur Karaman Assignee: Onur Karaman Priority: Minor Fix For: 0.8.3 Let's say every consumer in a group has session timeout s. Currently, if a consumer leaves the group, the worst case time to stabilize the group is 2s (s to detect the consumer failure + s for the rebalance window). If a consumer instead can declare they are leaving the group, the worst case time to stabilize the group would just be the s associated with the rebalance window. This is a low priority optimization! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2399) Replace Stream.continually with Iterator.continually
[ https://issues.apache.org/jira/browse/KAFKA-2399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652297#comment-14652297 ] ASF GitHub Bot commented on KAFKA-2399: --- GitHub user ijuma opened a pull request: https://github.com/apache/kafka/pull/106 KAFKA-2399; Replace `Stream.continually` with `Iterator.continually` `Iterator.continually` is more efficient (it doesn't allocate a `Cons` instance per element) and we don't need the extra functionality provided by `Stream.continually`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijuma/kafka kafka-2399-replace-stream-continually Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/106.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #106 commit 68531ed09d4bf7f263e07c4d410dc1916d85666c Author: Ismael Juma ism...@juma.me.uk Date: 2015-08-03T18:54:04Z Replace `Stream.continually` with `Iterator.continually` Replace Stream.continually with Iterator.continually Key: KAFKA-2399 URL: https://issues.apache.org/jira/browse/KAFKA-2399 Project: Kafka Issue Type: Bug Reporter: Ismael Juma Priority: Minor There are two usages of `Stream.continually` and neither of them seems to need the extra functionality it provides over `Iterator.continually` (`Stream.continually` allocates `Cons` instances to save the computation instead of recomputing it if needed more than once). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2399) Replace Stream.continually with Iterator.continually
[ https://issues.apache.org/jira/browse/KAFKA-2399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2399: --- Assignee: Ismael Juma Status: Patch Available (was: Open) Replace Stream.continually with Iterator.continually Key: KAFKA-2399 URL: https://issues.apache.org/jira/browse/KAFKA-2399 Project: Kafka Issue Type: Bug Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor There are two usages of `Stream.continually` and neither of them seems to need the extra functionality it provides over `Iterator.continually` (`Stream.continually` allocates `Cons` instances to save the computation instead of recomputing it if needed more than once). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1360) Configure continuous integration (CI) for Kafka branches under active development
[ https://issues.apache.org/jira/browse/KAFKA-1360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-1360. Resolution: Fixed There is also a job for GitHub pull requests: https://builds.apache.org/job/kafka-trunk-git-pr/ I will close this, but please reopen if you think this is not satisfactory. Configure continuous integration (CI) for Kafka branches under active development - Key: KAFKA-1360 URL: https://issues.apache.org/jira/browse/KAFKA-1360 Project: Kafka Issue Type: Improvement Components: packaging Reporter: Clark Breyman Priority: Minor Labels: build, ci Original Estimate: 72h Remaining Estimate: 72h Having continuous integration configured for Kafka would help the project by - ensuring that developers are notified when the build or tests break - automates the publication of test coverage and performance information - simplifies the process of publishing nightlies to both maven central and the package download site (which also makes it easier to get feedback on pre-release builds) See http://ci.apache.org for Apache managed options. Jenkins seems the most appealing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33620: Patch for KAFKA-1690
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review93939 --- clients/src/main/java/org/apache/kafka/clients/ClientUtils.java (line 77) https://reviews.apache.org/r/33620/#comment148371 nit: move this line closer to the first line where it is used, that is, to line 79. clients/src/main/java/org/apache/kafka/clients/ClientUtils.java (lines 80 - 86) https://reviews.apache.org/r/33620/#comment148379 nit: I would switch this if-else-if to a switch case as below: ''' switch (securityProtocol) { case SecurityProtocol.SSL: channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT); break; case SecurityProtocol.PLAINTEXT: channelBuilder = new PlainTextChannelBuilder(); break; default: throw new ConfigException(Invalid SecurityProtocol + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); } ''' clients/src/main/java/org/apache/kafka/clients/NetworkClient.java (line 474) https://reviews.apache.org/r/33620/#comment148372 As you are here, you could remove this extra blank line here, no? clients/src/main/java/org/apache/kafka/clients/NetworkClient.java (line 476) https://reviews.apache.org/r/33620/#comment148374 ''isSendable'' don't look like a good method name, imo. A better one would be ''canSendRequest'' or ''canSendMoreRequest''. clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java (line 19) https://reviews.apache.org/r/33620/#comment148376 Okay, this is a helper class that holds a lot of constants. In this case, it's a best practice to add a private constructor as below: ''' private SSLConfigs() {} ''' This private constructor will make the class both non-instantiable and make it non inheritable. clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java (line 28) https://reviews.apache.org/r/33620/#comment148378 The ''public'' accessor here is unnecessary as interface methods are public by default. clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java (line 38) https://reviews.apache.org/r/33620/#comment148377 It's a good practice to make both ''key'' and ''socketChannel'' as final fields. - Edward Ribeiro On July 25, 2015, 7:11 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 25, 2015, 7:11 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. KAFKA-1690. added staged receives to selector. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Diffs - build.gradle 0abec26fb2d7be62c8a673f9ec838e926e64b2d1 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
[jira] [Updated] (KAFKA-2388) subscribe(topic)/unsubscribe(topic) should either take a callback to allow user to handle exceptions or it should be synchronous.
[ https://issues.apache.org/jira/browse/KAFKA-2388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-2388: Assignee: Onur Karaman (was: Jiangjie Qin) subscribe(topic)/unsubscribe(topic) should either take a callback to allow user to handle exceptions or it should be synchronous. - Key: KAFKA-2388 URL: https://issues.apache.org/jira/browse/KAFKA-2388 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Onur Karaman According to the mailing list discussion on the consumer interface, we'll replace: {code} public void subscribe(String... topics); public void subscribe(TopicPartition... partitions); public SetTopicPartition subscriptions(); {code} with: {code} void subscribe(ListString topics, RebalanceCallback callback); void assign(ListTopicPartition partitions); ListString subscriptions(); ListTopicPartition assignments(); {code} We don't need the unsubscribe APIs anymore. The RebalanceCallback would look like: {code} interface RebalanceCallback { void onAssignment(ListTopicPartition partitions); void onRevocation(ListTopicPartition partitions); // handle non-existing topics, etc. void onError(Exception e); } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2388) subscribe(topic)/unsubscribe(topic) should either take a callback to allow user to handle exceptions or it should be synchronous.
[ https://issues.apache.org/jira/browse/KAFKA-2388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652126#comment-14652126 ] Dong Lin commented on KAFKA-2388: - [~hachikuji] Sorry, my bad.. [~onurkaraman] is working on it but I assigned it to [~becket_qin] by mistake. subscribe(topic)/unsubscribe(topic) should either take a callback to allow user to handle exceptions or it should be synchronous. - Key: KAFKA-2388 URL: https://issues.apache.org/jira/browse/KAFKA-2388 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Onur Karaman According to the mailing list discussion on the consumer interface, we'll replace: {code} public void subscribe(String... topics); public void subscribe(TopicPartition... partitions); public SetTopicPartition subscriptions(); {code} with: {code} void subscribe(ListString topics, RebalanceCallback callback); void assign(ListTopicPartition partitions); ListString subscriptions(); ListTopicPartition assignments(); {code} We don't need the unsubscribe APIs anymore. The RebalanceCallback would look like: {code} interface RebalanceCallback { void onAssignment(ListTopicPartition partitions); void onRevocation(ListTopicPartition partitions); // handle non-existing topics, etc. void onError(Exception e); } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2397) leave group request
[ https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652198#comment-14652198 ] Jason Gustafson commented on KAFKA-2397: [~onurkaraman] I like this idea. Wouldn't the expected rebalance time actually be just the heartbeat interval since that's how long it would take the other group members to see the need to rebalance and send the new join group request? I think [~jkreps] was also suggesting to implement this as an un-heartbeat (i.e. with a flag on the heartbeat request), but I'm not sure if there was a strong reason to prefer that over the explicit request. leave group request --- Key: KAFKA-2397 URL: https://issues.apache.org/jira/browse/KAFKA-2397 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Onur Karaman Assignee: Onur Karaman Priority: Minor Fix For: 0.8.3 Let's say every consumer in a group has session timeout s. Currently, if a consumer leaves the group, the worst case time to stabilize the group is 2s (s to detect the consumer failure + s for the rebalance window). If a consumer instead can declare they are leaving the group, the worst case time to stabilize the group would just be the s associated with the rebalance window. This is a low priority optimization! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33620: Patch for KAFKA-1690
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review93862 --- Thanks for the patch. A few more comments below. build.gradle (lines 247 - 249) https://reviews.apache.org/r/33620/#comment148282 As Ismael mentioned, we got rid of scala 2.9. So this is not needed. clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 151 - 152) https://reviews.apache.org/r/33620/#comment148279 We probably need to try/catch IOException as we do above? clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 234 - 239) https://reviews.apache.org/r/33620/#comment148265 If handshakeStatus is NEED_UNWRAP and write is true, we will fall through to the next case. However, there may still be unflushed data. flush() won't be called when write is true. Perhaps the check for write is unnecessary since (a) flush() always handles the case when write is false; (b) since we may have done a flush in line 220 and the writable status could have changed after that, which makes the value in write stale. clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 262 - 269) https://reviews.apache.org/r/33620/#comment148268 Could we transition from NEED_WRAP to NOT_HANDSHAKING directly? Or NOT_HANDSHAKING can only be transitioned from FINIHED state? clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 306 - 320) https://reviews.apache.org/r/33620/#comment148281 It seems that the logic here can be simpler. In handshake(), we call flush at the beginning. So, it seems that when handshakeFinished(), it should always be the case that there are no remaining bytes in netWriteBuffer. So, in handshakeFinished(), it seems that we can just simply set handshakeComplete to true and turn off OP_WRITE. Also, not sure if we need to check handshakeResult.getHandshakeStatus(). clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 357 - 360) https://reviews.apache.org/r/33620/#comment148267 Is this correct? After netReadBuffer.compact in line 375, limit is set to capacity and position is set to first unread byte. The only case when they can be equal is that we get a full capacity worth of bytes and don't read any byte during unwrap. In this case, we shouldn't empty the buffer. clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 368 - 381) https://reviews.apache.org/r/33620/#comment148266 If handshake status is BUFFER_OVERFLOW, we will return to the caller and then to the selector. However, we may have read all incoming bytes into netReadBuffer. So, the key may never be selected again to complete the handshake. It seems that this case can never happen during handshake since we don't expect to use the appReadBuffer. Perhaps we can just assert that state is illegal when handling NEED_UNWRAP in handshake(). clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (line 409) https://reviews.apache.org/r/33620/#comment148274 Agreed with Dong: Maybe change to if (netread = 0) return netread? clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (line 417) https://reviews.apache.org/r/33620/#comment148276 It's still not very clear to me how renegotiation can be supported in the middle of sends/receives. Suppose that the server initiates a handshake. This may involve the server sending some handshake bytes to the client. After this point, the server expects to read handshake bytes from the client. However, the client may still be sending some regular bytes over the socket. clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 430 - 433) https://reviews.apache.org/r/33620/#comment148275 Is this needed? If we need to expand appReadBuffer, netReadBuffer's position won't be 0 and we can just loop back. clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 75) https://reviews.apache.org/r/33620/#comment148269 Could we add a comment on why we need to maintain this map? clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 247 - 248) https://reviews.apache.org/r/33620/#comment148272 It seems that we will need to further check whether those channels in stagedReceives are muted or not. Timeout should only be 0 if there is at least one unmuted channel in stagedReceives. clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 281 - 287) https://reviews.apache.org/r/33620/#comment148273 To avoid having to buffer unbounded number of receives in staged receives, perhaps we can choose not to read from the channel if there exist staged receives for a channel. This will help protect
Re: Gauging Interest in adding Encryption to Kafka
If I understand you correctly, you are saying that the kerberos keytab that the broker uses to authenticate with the KMS will be somewhere on the broker node and can be used by a malicious admin. I agree this is a valid concern. I am not opposed to client-side encryption, I am more concerned that the modifications this will require in Kafka broker implementation make the idea impractical. And obviously, as in any security discussion - there are lots of details regarding key exchange, management and protection that are critical. Perhaps given a design doc, we can better evaluate the proposed tradeoffs. Gwen On Sat, Aug 1, 2015 at 10:10 AM, Don Bosco Durai bo...@apache.org wrote: Any reason you think its better to let the clients handle it? Gwen, I agree with Todd, depending on the goal, the requirements might vary. If the goal is that someone stills the disk, then they should be able to access the data, then encrypting at Broker is enough. However, if the requirement is that the admin/operator should not be able to access the data, then client side is the only option. Hadoop/HDFS transparent data encryption has a similar philosophy, where the actual encryption/decryption happens at the client side. 1. Key management Hadoop common has a KMS. And there are industry standards like KMIP. If Broker does the encrypt/decrypt, then the solution is much easier. If the client does it, then sharing the key would be a challenge. It might be even necessary to use asymmetric encryption to limit sharing of the keys. Bosco On 7/31/15, 9:31 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: I agree with Todd, the major concern I have is still the complexity on broker which can kill the performance - which a key advantage of Kafka. I think there are two separate issues here: 1. Key management 2. the actual encryption/decryption work. Personally I think it might be OK to have [1] supported in Kafka given we might need to be compatible with different key management system anyway. But we should just make Kafka compatible with other key management systems instead of letting Kafka itself to manage the keys. For [2], I think we should keep it on the client side. Jiangjie (Becket) Qin On Fri, Jul 31, 2015 at 5:06 PM, Todd Palino tpal...@gmail.com wrote: 1 - Yes, authorization combined with encryption does get us most of the way there. However, depending on the auditor it might not be good enough. The problem is that if you are encrypting at the broker, then by definition anyone who has access to the broker (i.e. operations staff) have access to the data. Consider the case where you are passing salary and other information through the system, and those people do not need a view of it. I admit, the 90% solution might be better here than going for a perfect solution, but it is something to think about. 2 - My worry is people wanting to integrate with different key systems. For example, one person may be fine with providing it in a config file, while someone else may want to use the solution from vendor A, someone else wants vendor B, and yet another person wants this obscure hardware-based solution that exists elsewhere. The compaction concern is definitely a good one I hadn't thought of. I'm wondering if it's reasonable to just say that compaction will not work properly with encrypted keys if you do not have consistent encryption (that is, the same string encrypts to the same string every time). Ultimately I don't like the idea of the broker doing any encrypt/decrypt steps OR compression/decompression. This is all CPU overhead that you're concentrating in one place instead of distributing the load out to the clients. Now yes, I know that the broker decompresses to check the CRC and assign offsets and then compresses, and we can potentially avoid the compression step with assigning the batch an offset and a count instead but we still need to consider the CRC. Adding encrypt/decrypt steps adds even more overhead and it's going to get very difficult to handle even 2 Gbits worth of traffic at that rate. There are other situations that concern me, such as revocation of keys, and I'm not sure whether it is better with client-based or server-based encryption. For example, if I want to revoke a key with client-based encryption it becomes similar to how we handle Avro schemas (internally) now - you change keys, and depending on what your desire is you either expire out the data for some period of time with the older keys, or you just let it sit there and your consuming clients won't have an issue. With broker-based encryption, the broker has to work with the multiple keys per-topic. -Todd On Fri, Jul 31, 2015 at 2:38 PM, Gwen Shapira gshap...@cloudera.com wrote: Good points :) 1) Kafka already (pending commit) has an authorization layer, so theoretically we are good for SOX,
[jira] [Commented] (KAFKA-2388) subscribe(topic)/unsubscribe(topic) should either take a callback to allow user to handle exceptions or it should be synchronous.
[ https://issues.apache.org/jira/browse/KAFKA-2388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652146#comment-14652146 ] Jason Gustafson commented on KAFKA-2388: [~becket_qin] That works too! subscribe(topic)/unsubscribe(topic) should either take a callback to allow user to handle exceptions or it should be synchronous. - Key: KAFKA-2388 URL: https://issues.apache.org/jira/browse/KAFKA-2388 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Onur Karaman According to the mailing list discussion on the consumer interface, we'll replace: {code} public void subscribe(String... topics); public void subscribe(TopicPartition... partitions); public SetTopicPartition subscriptions(); {code} with: {code} void subscribe(ListString topics, RebalanceCallback callback); void assign(ListTopicPartition partitions); ListString subscriptions(); ListTopicPartition assignments(); {code} We don't need the unsubscribe APIs anymore. The RebalanceCallback would look like: {code} interface RebalanceCallback { void onAssignment(ListTopicPartition partitions); void onRevocation(ListTopicPartition partitions); // handle non-existing topics, etc. void onError(Exception e); } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652110#comment-14652110 ] Jun Rao commented on KAFKA-1690: [~sriharsha], I made another pass of the patch and left some comments on the RB. [~ijuma] and [~dong lin], thanks for the review comments. Those are valuable. The SSL patch is relatively big since there is no easy way to break it into smaller chunks. Harsha has been working very diligently on this and the patch is close to be committed. To help him out, I'd suggest we only focus on blocking issues now so that the patch can be committed soon. We can file followup jira for non-critical issues (e.g., coding style). new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652118#comment-14652118 ] Ismael Juma commented on KAFKA-1690: [~junrao], fair enough. It will definitely be good to have this merged soon. new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-873) Consider replacing zkclient with curator (with zkclient-bridge)
[ https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14651764#comment-14651764 ] Flavio Junqueira commented on KAFKA-873: Let me elaborate on my suggestion. Adopting Curator doesn't preclude the development of a Kafka wrapper (e.g., like we have done in apache bookkeeper), and as I understand it, Curator has done a better job at keeping track of the changes in the zookeeper releases. For example, I believe zkclient still depends on 3.4.3. I also think that the pluggable mechanism is an orthogonal effort that could eventually replace all this, but here we already have a patch at least. Consider replacing zkclient with curator (with zkclient-bridge) --- Key: KAFKA-873 URL: https://issues.apache.org/jira/browse/KAFKA-873 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.0 Reporter: Scott Clasen Assignee: Grant Henke Fix For: 0.8.3 If zkclient was replaced with curator and curator-x-zkclient-bridge it would be initially a drop-in replacement https://github.com/Netflix/curator/wiki/ZKClient-Bridge With the addition of a few more props to ZkConfig, and a bit of code this would open up the possibility of using ACLs in zookeeper (which arent supported directly by zkclient), as well as integrating with netflix exhibitor for those of us using that. Looks like KafkaZookeeperClient needs some love anyhow... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stevo Slavic updated KAFKA-2120: Comment: was deleted (was: If this is considered as must have for Kafka 0.8.3, please consider setting Fix Version to 0.8.3. Related issue, KAFKA-1788 is fixed only for 0.8.3.) Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2384) Override commit message title in kafka-merge-pr.py
[ https://issues.apache.org/jira/browse/KAFKA-2384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652397#comment-14652397 ] ASF GitHub Bot commented on KAFKA-2384: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/105 Override commit message title in kafka-merge-pr.py -- Key: KAFKA-2384 URL: https://issues.apache.org/jira/browse/KAFKA-2384 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Ismael Juma Fix For: 0.8.3 It would be more convenient allow setting the commit message in the merging script; right now the script takes the PR title as is and the contributors have to change them according to the submission-review guidelines before doing the merge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2397) leave group request
[ https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652386#comment-14652386 ] Onur Karaman edited comment on KAFKA-2397 at 8/3/15 8:48 PM: - Hey everyone. There's a difference between the best, expected, and worst case rebalance time. Trunk - A consumer leaves at t = 0 and the coordinator detects the failure at t = s. The rebalance window can close as soon as all the existing consumers rejoin and as late as the maximum member session timeout. The time to stabilize since the consumer failure is something like: {code} t = s + rebalance_timeout {code} Best case: The coordinator receives all of the remaining consumers' heartbeats immediately after t = s. All of the remaining consumers rejoin immediately after receiving the heartbeat response. So everything is done by *t ~= s + coordinator_join_group_request_receival_delay*. Expected case: The coordinator receives all of the remaining consumers' heartbeats at t = 4s/3 because consumers will typically figure out the rebalance after s/3 (an oversimplification. Consumers of a group actually have staggered heartbeat intervals). All of the remaining consumers eventually rejoin (coordinator_join_group_request_receival_delay). So everything is done by *t ~= s + (s/3 + coordinator_join_group_request_receival_delay)*. Worst case: All of the consumers in the group somehow fail to get notified of the rebalance until very last possible moment and rejoin the group just before the rebalance window ends: *t = s + s*. LeaveGroupRequest - A consumer leaves at t = 0 and sends out the LeaveGroupRequest. The rebalance window can close as soon as all the existing consumers rejoin and as late as the maximum member session timeout. The LeaveGroupRequest would cut down the time to stabilize since the consumer failure to something like: {code} t = coordinator_leave_group_request_receival_delay + rebalance_timeout {code} Best case: The coordinator receives all of the remaining consumers' heartbeats immediately after t = coordinator_leave_group_request_receival_delay. All of the remaining consumers rejoin immediately after receiving the heartbeat response. So everything is done by *t ~= coordinator_leave_group_request_receival_delay + coordinator_join_group_request_receival_delay*. Expected case: The coordinator receives all of the remaining consumers' heartbeats at t = coordinator_leave_group_request_receival_delay + s/3 because consumers will typically figure out the rebalance after s/3 (an oversimplification. Consumers of a group actually have staggered heartbeat intervals). All of the remaining consumers eventually rejoin (coordinator_join_group_request_receival_delay). So everything is done by *t ~= coordinator_leave_group_request_receival_delay + (s/3 + coordinator_join_group_request_receival_delay)*. I'm assuming coordinator_leave_group_request_receival_delay s. Worst case: The coordinator receives the LeaveGroupRequest at t = coordinator_leave_group_request_receival_delay. All of the consumers in the group somehow fail to get notified of the rebalance until very last possible moment and rejoin the group just before the rebalance window ends: *t = coordinator_leave_group_request_receival_delay + s*. I'm assuming coordinator_leave_group_request_receival_delay s. Absolute worst case: The LeaveGroupRequest somehow got dropped before reaching the coordinator. The heartbeat would timeout on the coordinator anyway and hit the existing *t = s + s* behavior. Summary - So I guess the absolute worst case behavior hasn't changed if the LeaveGroupRequest was somehow dropped, but everything else should get better by about s. P.S: To avoid confusion, it's probably best to state whether you're talking about the behavior in trunk or the proposed behavior with LeaveGroupRequest. I prefer having a separate LeaveGroupRequest, but that's less of the focus here. was (Author: onurkaraman): Hey everyone. There's a difference between the best, expected, and worst case rebalance time. Trunk - A consumer leaves at t = 0 and the coordinator detects the failure at t = s. The rebalance window can close as soon as all the existing consumers rejoin and as late as the maximum member session timeout. The time to stabilize since the consumer failure is something like: {code} t = s + rebalance_timeout {code} Best case: The coordinator receives all of the remaining consumers' heartbeats immediately after t = s. All of the remaining consumers rejoin immediately after receiving the heartbeat response. So everything is done by *t ~= s*. Expected case: The coordinator receives all of the remaining heartbeats at t = 4s/3 because consumers will typically figure out the rebalance after s/3 (an oversimplification. Consumers of a group actually have staggered heartbeat intervals). All of
[jira] [Commented] (KAFKA-2384) Override commit message title in kafka-merge-pr.py
[ https://issues.apache.org/jira/browse/KAFKA-2384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652498#comment-14652498 ] Guozhang Wang commented on KAFKA-2384: -- [~ijuma] There seems some unicode encoding error with Python 2.x with this patch. An example is: https://github.com/apache/kafka/pull/98 I tried to fix it on my local machine as follows: {code} -pr_title = pr[title] +pr_title = pr[title].encode(utf-8) {code} But it does not seem fix all the problems. Could you take a look? Override commit message title in kafka-merge-pr.py -- Key: KAFKA-2384 URL: https://issues.apache.org/jira/browse/KAFKA-2384 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Ismael Juma Fix For: 0.8.3 It would be more convenient allow setting the commit message in the merging script; right now the script takes the PR title as is and the contributors have to change them according to the submission-review guidelines before doing the merge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-2384) Override commit message title in kafka-merge-pr.py
[ https://issues.apache.org/jira/browse/KAFKA-2384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reopened KAFKA-2384: -- Override commit message title in kafka-merge-pr.py -- Key: KAFKA-2384 URL: https://issues.apache.org/jira/browse/KAFKA-2384 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Ismael Juma Fix For: 0.8.3 It would be more convenient allow setting the commit message in the merging script; right now the script takes the PR title as is and the contributors have to change them according to the submission-review guidelines before doing the merge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: MINOR: Added to .gitignore Kafka server logs d...
Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/94 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1684: Affects Version/s: (was: 0.9.0) Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1684.patch, KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Gauging Interest in adding Encryption to Kafka
Doing encryption on the client has the following benefits (most of them already mentioned in the thread): * brokers don't have additional CPU load * brokers never see the data in unencrypted form (Kafka admins cannot snoop) * secure multi-tenancy (keys are 100% on the client space) * no need to secure Kafka wire transport, client-broker and broker-broker (data is already encrypted) It would be highly desirable, even if encryption is done on the client side, that encryption is 'transparent'. Similar to how HDFS encryption works, it is not the client writing/reading a topic the one that decides to encrypt/decrypt but the broker is the one telling that to the client and providing encrypted encryption keys for the tasks.The client-broker protocol would have to be augmented to carry the encrypted encryption key, plus logic to handle redistribution to existing clients due to key rotation. A nice thing about doing broker side encryption though is that you can shut off clients at any time and they won't see unencrypted data anymore. But this means the brokers will have to deal with the client ACLs for encryption (i'd rather leave that outside of Kafka and being a concern of the KMS system). You could achieve similar functionality on client side encryption, by removing the client from the ACLs in the KMS and doing a key rotation, then the client will not be able to decrypt any messages using the new key (though all previous ones using the key that the client already has will be visible to the client). Thanks. On Mon, Aug 3, 2015 at 10:22 AM, Gwen Shapira g...@confluent.io wrote: If I understand you correctly, you are saying that the kerberos keytab that the broker uses to authenticate with the KMS will be somewhere on the broker node and can be used by a malicious admin. I agree this is a valid concern. I am not opposed to client-side encryption, I am more concerned that the modifications this will require in Kafka broker implementation make the idea impractical. And obviously, as in any security discussion - there are lots of details regarding key exchange, management and protection that are critical. Perhaps given a design doc, we can better evaluate the proposed tradeoffs. Gwen On Sat, Aug 1, 2015 at 10:10 AM, Don Bosco Durai bo...@apache.org wrote: Any reason you think its better to let the clients handle it? Gwen, I agree with Todd, depending on the goal, the requirements might vary. If the goal is that someone stills the disk, then they should be able to access the data, then encrypting at Broker is enough. However, if the requirement is that the admin/operator should not be able to access the data, then client side is the only option. Hadoop/HDFS transparent data encryption has a similar philosophy, where the actual encryption/decryption happens at the client side. 1. Key management Hadoop common has a KMS. And there are industry standards like KMIP. If Broker does the encrypt/decrypt, then the solution is much easier. If the client does it, then sharing the key would be a challenge. It might be even necessary to use asymmetric encryption to limit sharing of the keys. Bosco On 7/31/15, 9:31 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: I agree with Todd, the major concern I have is still the complexity on broker which can kill the performance - which a key advantage of Kafka. I think there are two separate issues here: 1. Key management 2. the actual encryption/decryption work. Personally I think it might be OK to have [1] supported in Kafka given we might need to be compatible with different key management system anyway. But we should just make Kafka compatible with other key management systems instead of letting Kafka itself to manage the keys. For [2], I think we should keep it on the client side. Jiangjie (Becket) Qin On Fri, Jul 31, 2015 at 5:06 PM, Todd Palino tpal...@gmail.com wrote: 1 - Yes, authorization combined with encryption does get us most of the way there. However, depending on the auditor it might not be good enough. The problem is that if you are encrypting at the broker, then by definition anyone who has access to the broker (i.e. operations staff) have access to the data. Consider the case where you are passing salary and other information through the system, and those people do not need a view of it. I admit, the 90% solution might be better here than going for a perfect solution, but it is something to think about. 2 - My worry is people wanting to integrate with different key systems. For example, one person may be fine with providing it in a config file, while someone else may want to use the solution from vendor A, someone else wants vendor B, and yet another person wants this obscure hardware-based solution that exists elsewhere. The compaction concern is definitely a good one I hadn't thought of.
Re: Review Request 33125: Add comment to timing fix
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33125/#review93953 --- clients/src/test/java/org/apache/kafka/clients/MetadataTest.java (line 58) https://reviews.apache.org/r/33125/#comment148391 Wow, 1 millisecond?! Does it make a difference? Just curious. :) - Edward Ribeiro On April 13, 2015, 7:15 p.m., Rajini Sivaram wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33125/ --- (Updated April 13, 2015, 7:15 p.m.) Review request for kafka. Bugs: KAFKA-2089 https://issues.apache.org/jira/browse/KAFKA-2089 Repository: kafka Description --- Patch for KAFKA-2089: Fix timing issue in MetadataTest Diffs - clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 928087d29deb80655ca83726c1ebc45d76468c1f Diff: https://reviews.apache.org/r/33125/diff/ Testing --- Thanks, Rajini Sivaram
[jira] [Commented] (KAFKA-2397) leave group request
[ https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14652386#comment-14652386 ] Onur Karaman commented on KAFKA-2397: - Hey everyone. There's a difference between the best, expected, and worst case rebalance time. Trunk - A consumer leaves at t = 0 and the coordinator detects the failure at t = s. The rebalance window can close as soon as all the existing consumers rejoin and as late as the maximum member session timeout. The time to stabilize since the consumer failure is something like: {code} t = s + rebalance_timeout {code} Best case: The coordinator receives all of the remaining consumers' heartbeats immediately after t = s. All of the remaining consumers rejoin immediately after receiving the heartbeat response. So everything is done by *t ~= s*. Expected case: The coordinator receives all of the remaining heartbeats at t = 4s/3 because consumers will typically figure out the rebalance after s/3 (an oversimplification. Consumers of a group actually have staggered heartbeat intervals). All of the remaining consumers eventually rejoin (coordinator_join_group_request_receival_delay). So everything is done by *t ~= s + (s/3 + coordinator_join_group_request_receival_delay)*. Worst case: All of the consumers in the group somehow fail to get notified of the rebalance until very last possible moment and rejoin the group just before the rebalance window ends: *t = s + s*. LeaveGroupRequest - A consumer leaves at t = 0 and sends out the LeaveGroupRequest. The rebalance window can close as soon as all the existing consumers rejoin and as late as the maximum member session timeout. The LeaveGroupRequest would cut down the time to stabilize since the consumer failure to something like: {code} t = coordinator_leave_group_request_receival_delay + rebalance_timeout {code} Best case: A consumer leaves at t = 0, sends out the LeaveGroupRequest, and the coordinator immediately receives the LeaveGroupRequest. The coordinator receives all of the remaining consumers' heartbeats immediately after t = 0. All of the remaining consumers rejoin immediately after receiving the heartbeat response. So everything is done by *t ~= 0*. Expected case: A consumer leaves at t = 0, sends out the LeaveGroupRequest, and the coordinator receives the LeaveGroupRequest at t = coordinator_leave_group_request_receival_delay. All of the remaining consumers eventually rejoin (coordinator_join_group_request_receival_delay). So everything is done by *t ~= coordinator_leave_group_request_receival_delay + (s/3 + coordinator_join_group_request_receival_delay)*. I'm assuming coordinator_leave_group_request_receival_delay s. Worst case: A consumer leaves at t = 0, sends out the LeaveGroupRequest, and the coordinator receives the LeaveGroupRequest at t = coordinator_leave_group_request_receival_delay. All of the consumers in the group somehow fail to get notified of the rebalance until very last possible moment and rejoin the group just before the rebalance window ends: *t = coordinator_leave_group_request_receival_delay + s*. I'm assuming coordinator_leave_group_request_receival_delay s. Absolute worst case: The LeaveGroupRequest somehow got dropped before reaching the coordinator. The heartbeat would timeout on the coordinator anyway and hit the existing *t = s + s* behavior. Summary - So I guess the absolute worst case behavior hasn't changed if the LeaveGroupRequest was somehow dropped, but everything else should get better by about s. P.S: To avoid confusion, it's probably best to state whether you're talking about the behavior in trunk or the proposed behavior with LeaveGroupRequest. I prefer having a separate LeaveGroupRequest, but that's less of the focus here. leave group request --- Key: KAFKA-2397 URL: https://issues.apache.org/jira/browse/KAFKA-2397 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Onur Karaman Assignee: Onur Karaman Priority: Minor Fix For: 0.8.3 Let's say every consumer in a group has session timeout s. Currently, if a consumer leaves the group, the worst case time to stabilize the group is 2s (s to detect the consumer failure + s for the rebalance window). If a consumer instead can declare they are leaving the group, the worst case time to stabilize the group would just be the s associated with the rebalance window. This is a low priority optimization! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: KAFKA-2384; Override commit message title in k...
Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/105 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (KAFKA-2384) Override commit message title in kafka-merge-pr.py
[ https://issues.apache.org/jira/browse/KAFKA-2384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2384: - Resolution: Fixed Status: Resolved (was: Patch Available) Issue resolved by pull request 105 [https://github.com/apache/kafka/pull/105] Override commit message title in kafka-merge-pr.py -- Key: KAFKA-2384 URL: https://issues.apache.org/jira/browse/KAFKA-2384 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Ismael Juma Fix For: 0.8.3 It would be more convenient allow setting the commit message in the merging script; right now the script takes the PR title as is and the contributors have to change them according to the submission-review guidelines before doing the merge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerGroupCommand
[ https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-313: --- Status: In Progress (was: Patch Available) Moving to in progress until new patch is submitted fixing issues highlighted in RB. Add JSON/CSV output and looping options to ConsumerGroupCommand --- Key: KAFKA-313 URL: https://issues.apache.org/jira/browse/KAFKA-313 Project: Kafka Issue Type: Improvement Reporter: Dave DeMaagd Assignee: Ashish K Singh Priority: Minor Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch, KAFKA-313_2015-06-24_11:14:24.patch Adds: * '--loop N' - causes the program to loop forever, sleeping for up to N seconds between loops (loop time minus collection time, unless that's less than 0, at which point it will just run again immediately) * '--asjson' - display as a JSON string instead of the more human readable output format. Neither of the above depend on each other (you can loop in the human readable output, or do a single shot execution with JSON output). Existing behavior/output maintained if neither of the above are used. Diff Attached. Impacted files: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)