Re: Review Request 29301: Patch for KAFKA-1694
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29301/ --- (Updated Jan. 12, 2015, 1:28 p.m.) Review request for kafka. Bugs: KAFKA-1694 https://issues.apache.org/jira/browse/KAFKA-1694 Repository: kafka Description (updated) --- KAFKA-1694 - introduced new type for Wire protocol, ported ClusterMetadataResponse to it KAFKA-1694 - Split Admin RQ/RP to separate messages KAFKA-1694 - Admin commands can be handled only by controller; DeleteTopicCommand NPE fix KAFKA-1776 - Ported ConsumerGroupOffsetChecker KAFKA-1776 - Ported PreferredReplicaElectionTool and ReassignPartitionsTool to CLI Diffs (updated) - bin/kafka.sh PRE-CREATION bin/windows/kafka.bat PRE-CREATION build.gradle ba52288031e2abc70e35e9297a4423dd5025950b clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 109fc965e09b2ed186a073351bd037ac8af20a4c clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/MaybeOf.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicOutput.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsOutput.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/TopicConfigDetails.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/TopicPartitionsDetails.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsResponse.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/ApiUtils.scala 1f80de1638978901500df808ca5133308c9d1fca core/src/main/scala/kafka/api/ClusterMetadataRequest.scala PRE-CREATION core/src/main/scala/kafka/api/ClusterMetadataResponse.scala PRE-CREATION core/src/main/scala/kafka/api/RequestKeys.scala c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 core/src/main/scala/kafka/api/admin/AlterTopicRequest.scala PRE-CREATION core/src/main/scala/kafka/api/admin/AlterTopicResponse.scala PRE-CREATION core/src/main/scala/kafka/api/admin/ConsumerGroupOffsetsRequest.scala PRE-CREATION core/src/main/scala/kafka/api/admin/ConsumerGroupOffsetsResponse.scala PRE-CREATION
[jira] [Updated] (KAFKA-1694) kafka command line and centralized operations
[ https://issues.apache.org/jira/browse/KAFKA-1694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrii Biletskyi updated KAFKA-1694: Attachment: KAFKA-1694_2015-01-12_15:28:41.patch kafka command line and centralized operations - Key: KAFKA-1694 URL: https://issues.apache.org/jira/browse/KAFKA-1694 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Andrii Biletskyi Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1694.patch, KAFKA-1694_2014-12-24_21:21:51.patch, KAFKA-1694_2015-01-12_15:28:41.patch, KAFKA-1772_1802_1775_1774_v2.patch https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1694) kafka command line and centralized operations
[ https://issues.apache.org/jira/browse/KAFKA-1694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273588#comment-14273588 ] Andrii Biletskyi commented on KAFKA-1694: - Updated reviewboard https://reviews.apache.org/r/29301/diff/ against branch origin/trunk kafka command line and centralized operations - Key: KAFKA-1694 URL: https://issues.apache.org/jira/browse/KAFKA-1694 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Andrii Biletskyi Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1694.patch, KAFKA-1694_2014-12-24_21:21:51.patch, KAFKA-1694_2015-01-12_15:28:41.patch, KAFKA-1772_1802_1775_1774_v2.patch https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] 0.8.2-beta2 release
Hi, Joe, Yes, we can do that. Also, an alternative to doing an 0.8.2 beta2 is to just do the 0.8.2 final, but leave a bit more time for people to try the RC. Thanks, Jun On Thu, Jan 8, 2015 at 8:46 PM, Joe Stein joe.st...@stealth.ly wrote: Hi, I was thinking that once all the blockers are committed for 0.8.2 that we should release beta2. Thoughts? /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
Re: Review Request 23702: Patch for KAFKA-1070
On Jan. 9, 2015, 11:39 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala, line 48 https://reviews.apache.org/r/23702/diff/8-9/?file=776402#file776402line48 Does java serialization for the Properties object allow you to read just the version field? Basically, if we want to upgrade the version of this file, for some time the server would have to support 2 versions and read the file differently based on the different versions. Also, if you serialize the file this way, is it still human readable? I'd recommend you look at other checkpoint file and follow the same or change those to use the new strategy you pick. Sriharsha Chintalapani wrote: brokerMetaProps.store(fileOutputStream,) stores the data in human-readable form . For ex: #Mon Jan 12 10:10:46 PST 2015 version=0 broker.id=1001 In BrokerMetadataCheckpoint.read() methods checks the version and does case operation similar to OffsetCheckpoint. In storing data OffsetCheckpoint is different, OffsetCheckpoint just stores the values like 0 topic=offset I like the current version for broker metadata as the file will be self-explanatory if users needed to change or look at the current values. I don't think OffsetCheckpoint and BrokerMetadataCheckpoint needs to follow same format to store data. If you disagree I can change it to just store data like in Offsetcheckpoint. Okay. What do you think of the following - Does java serialization for the Properties object allow you to read just the version field? Basically, if we want to upgrade the version of this file, for some time the server would have to support 2 versions and read the file differently based on the different versions. - Neha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review67551 --- On Jan. 12, 2015, 6:46 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Jan. 12, 2015, 6:46 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1691ad7fc80ca0b112f68e3ea0cbab265c75b26b core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala c9e8ba257b77f46c5c9b62b451470348b6e58889 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1819: Attachment: KAFKA-1819_2015-01-12_11:17:53.patch Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Gwen Shapira Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1819.patch, KAFKA-1819_2014-12-26_13:58:44.patch, KAFKA-1819_2014-12-30_16:01:19.patch, KAFKA-1819_2015-01-12_10:34:07.patch, KAFKA-1819_2015-01-12_11:17:53.patch I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29210: Patch for KAFKA-1819
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/ --- (Updated Jan. 12, 2015, 7:17 p.m.) Review request for kafka. Bugs: KAFKA-1819 https://issues.apache.org/jira/browse/KAFKA-1819 Repository: kafka Description (updated) --- added locking improved tests per Joel and Neha's suggestions added cleaner test to DeleteTopicTest Fixes to DeleteTopicTest: clean up servers after cleaner test and move cleaner verification to the validation function Diffs (updated) - core/src/main/scala/kafka/log/LogCleaner.scala f8fcb843c80eec3cf3c931df6bb472c019305253 core/src/main/scala/kafka/log/LogCleanerManager.scala bcfef77ed53f94017c06a884e4db14531774a0a2 core/src/main/scala/kafka/log/LogManager.scala 4ebaae00ca4b80bf15c7930bae2011d98bbec053 core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 Diff: https://reviews.apache.org/r/29210/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273999#comment-14273999 ] Gwen Shapira commented on KAFKA-1819: - Updated reviewboard https://reviews.apache.org/r/29210/diff/ against branch trunk Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Gwen Shapira Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1819.patch, KAFKA-1819_2014-12-26_13:58:44.patch, KAFKA-1819_2014-12-30_16:01:19.patch, KAFKA-1819_2015-01-12_10:34:07.patch, KAFKA-1819_2015-01-12_11:17:53.patch I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29692: Patch for kafka-1841
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29692/#review67667 --- Thanks for patching this. Looks good overall. clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java https://reviews.apache.org/r/29692/#comment111737 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala https://reviews.apache.org/r/29692/#comment111740 rather than use a var can we just use a case class copy to modify? i.e., `modifiedInstance = originalInstance.copy(fieldToModify=modifiedValue)` core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/29692/#comment111748 We should probably also change OffsetCommitRequest.responseFor . The issue is that if you get an UnknownTopicOrPartition error right now we convert that to a ConsumerCoordinatorNotAvailableCode which does not apply for v0. (BTW, if this patch were for trunk you would not need to do this since latest trunk sets the code correctly in the OffsetManager class) Alternatively, we could just remove the check here but that would be a change in behavior. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/29692/#comment111747 Should we explicitly version the scala side of OffsetCommitResponse as well especially given that the Java version has a v0/v1? E.g., if a client proactively checks for the response version... This seems to always send back version = 0 in the response core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/29692/#comment111757 Similar comment as above on the response version. - Joel Koshy On Jan. 9, 2015, 10:36 p.m., Jun Rao wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29692/ --- (Updated Jan. 9, 2015, 10:36 p.m.) Review request for kafka. Bugs: kafka-1841 https://issues.apache.org/jira/browse/kafka-1841 Repository: kafka Description --- rebased Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 9a61fcba3b5eeb295676b3ef720c719ef5244642 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 Diff: https://reviews.apache.org/r/29692/diff/ Testing --- Thanks, Jun Rao
Re: Review Request 23702: Patch for KAFKA-1070
On Jan. 9, 2015, 11:39 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala, line 48 https://reviews.apache.org/r/23702/diff/8-9/?file=776402#file776402line48 Does java serialization for the Properties object allow you to read just the version field? Basically, if we want to upgrade the version of this file, for some time the server would have to support 2 versions and read the file differently based on the different versions. Also, if you serialize the file this way, is it still human readable? I'd recommend you look at other checkpoint file and follow the same or change those to use the new strategy you pick. Sriharsha Chintalapani wrote: brokerMetaProps.store(fileOutputStream,) stores the data in human-readable form . For ex: #Mon Jan 12 10:10:46 PST 2015 version=0 broker.id=1001 In BrokerMetadataCheckpoint.read() methods checks the version and does case operation similar to OffsetCheckpoint. In storing data OffsetCheckpoint is different, OffsetCheckpoint just stores the values like 0 topic=offset I like the current version for broker metadata as the file will be self-explanatory if users needed to change or look at the current values. I don't think OffsetCheckpoint and BrokerMetadataCheckpoint needs to follow same format to store data. If you disagree I can change it to just store data like in Offsetcheckpoint. Neha Narkhede wrote: Okay. What do you think of the following - Does java serialization for the Properties object allow you to read just the version field? Basically, if we want to upgrade the version of this file, for some time the server would have to support 2 versions and read the file differently based on the different versions. I am not sure if I understand correctly. BrokerMetadataCheckpoint.read() does this. val brokerMetaProps = new VerifiableProperties(Utils.loadProps(file.getAbsolutePath())) val version = brokerMetaProps.getIntInRange(version, (0, Int.MaxValue)) version match { case 0 = . It does load the entire contents into brokerMetaProps and we can pick the version from it. If the intention is to just read the single line to get the version its not possible with the current approach. Any specific reason that we need just the version. In either case we will be able to support multiple version files at the same. Since the file contents are like key=value pairs. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review67551 --- On Jan. 12, 2015, 6:46 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Jan. 12, 2015, 6:46 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1691ad7fc80ca0b112f68e3ea0cbab265c75b26b core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala c9e8ba257b77f46c5c9b62b451470348b6e58889 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Compatibility + Unknown APIs
Hi all -- continuing on the compatibility discussion: I've found that it is very difficult to identify when a server does not recognize an api (I'm using kafka-python to submit wire-protocol requests). For example, when I send a ConsumerMetadataRequest to an 0.8.1.1 server, I get a closed socket *[stacktrace below]. The server raises an error internally, but does not send any meaningful response. I'm not sure whether this is the intended behavior, but maintaining clients in an ecosystem of multiple server versions with different API support it would be great to have a way to determine what the server supports and what it does not. Some suggestions: (1) An UnknownAPIResponse that is returned for any API or API Version request that is unsupported. (2) A server metadata API to get the list of supported APIs and/or API versions supported. (3) A server metadata API to get the published version of the server (0.8.2 v. 0.8.1.1, etc). Thoughts? Dana Powers Rdio, Inc. dana.pow...@rd.io rdio.com/people/dpkp/ *stacktrace: ``` [2015-01-12 13:03:55,719] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 10 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:57) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:722) ```
Kafka Cluster Monitoring and Documentation of Internals (JMX Metrics) of Rejected Events
Hi Kafka Team, I am trying to find out Kafka Internal and how a message can be corrupted or lost at brokers side. I have refer to following documentations for monitoring: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals http://kafka.apache.org/documentation.html#monitoring I am looking at following beans: kafka.server:type=BrokerTopicMetrics,name=test-FailedProduceRequestsPerSec kafka.server:type=BrokerTopicMetrics,name=test-BytesRejectedPerSec I see following exception on Brokers side rejecting due to large request. This is great but it does not show the source ip of prodcuer that caused this issue ? Is there any way to log and capture this ? *[2014-10-14 22:09:53,262] ERROR [KafkaApi-2] Error processing ProducerRequest with correlation id 28795280 from client Xon partition [XXX,17] (kafka.server.KafkaApis)kafka.common.MessageSizeTooLargeException: Message size is 2924038 bytes which exceeds the maximum configured message size of 2097152. * Can you this be reported as separate metric MessageSizeTooLargeException per topic ? Also, what is best way to find the CRC check error from the consumer side ? How do you debug this ? e.g log line: *11 Dec 2014 07:22:33,387 ERROR [pool-15-thread-4] * *kafka.message.InvalidMessageException: Message is corrupt (stored crc = 1834644195, computed crc = 2374999037)* Also, is there any jira open to update with list all latest metrics and its format and what it means ? http://kafka.apache.org/documentation.html#monitoring. Please see attached image for list of all metrics. Version of Broker is 0.8.1.1. Thanks, Bhavesh
Re: Review Request 23702: Patch for KAFKA-1070
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review67715 --- Ship it! Ship It! core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala https://reviews.apache.org/r/23702/#comment111797 Minor nit which I can address during checkin: According to our coding convention, there should be a space after the , here. - Neha Narkhede On Jan. 12, 2015, 6:46 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Jan. 12, 2015, 6:46 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1691ad7fc80ca0b112f68e3ea0cbab265c75b26b core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala c9e8ba257b77f46c5c9b62b451470348b6e58889 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Compatibility + Unknown APIs
We ran into similar difficulties, both when trying to get Kafka to use new APIs when available and when testing the wire protocol. +1 for all three suggestions. #1 sounds like the bare minimum, but I'm not sure how much it will complicate the clients (now we expect either a response or an Unknown message and need to be able to distinguish between them from the byte array). #2 and #3 both makes lots of sense. Gwen On Mon, Jan 12, 2015 at 1:15 PM, Dana Powers dana.pow...@rd.io wrote: Hi all -- continuing on the compatibility discussion: I've found that it is very difficult to identify when a server does not recognize an api (I'm using kafka-python to submit wire-protocol requests). For example, when I send a ConsumerMetadataRequest to an 0.8.1.1 server, I get a closed socket *[stacktrace below]. The server raises an error internally, but does not send any meaningful response. I'm not sure whether this is the intended behavior, but maintaining clients in an ecosystem of multiple server versions with different API support it would be great to have a way to determine what the server supports and what it does not. Some suggestions: (1) An UnknownAPIResponse that is returned for any API or API Version request that is unsupported. (2) A server metadata API to get the list of supported APIs and/or API versions supported. (3) A server metadata API to get the published version of the server (0.8.2 v. 0.8.1.1, etc). Thoughts? Dana Powers Rdio, Inc. dana.pow...@rd.io rdio.com/people/dpkp/ *stacktrace: ``` [2015-01-12 13:03:55,719] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 10 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:57) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:722) ```
Re: Review Request 23702: Patch for KAFKA-1070
On Jan. 9, 2015, 11:39 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala, line 48 https://reviews.apache.org/r/23702/diff/8-9/?file=776402#file776402line48 Does java serialization for the Properties object allow you to read just the version field? Basically, if we want to upgrade the version of this file, for some time the server would have to support 2 versions and read the file differently based on the different versions. Also, if you serialize the file this way, is it still human readable? I'd recommend you look at other checkpoint file and follow the same or change those to use the new strategy you pick. Sriharsha Chintalapani wrote: brokerMetaProps.store(fileOutputStream,) stores the data in human-readable form . For ex: #Mon Jan 12 10:10:46 PST 2015 version=0 broker.id=1001 In BrokerMetadataCheckpoint.read() methods checks the version and does case operation similar to OffsetCheckpoint. In storing data OffsetCheckpoint is different, OffsetCheckpoint just stores the values like 0 topic=offset I like the current version for broker metadata as the file will be self-explanatory if users needed to change or look at the current values. I don't think OffsetCheckpoint and BrokerMetadataCheckpoint needs to follow same format to store data. If you disagree I can change it to just store data like in Offsetcheckpoint. Neha Narkhede wrote: Okay. What do you think of the following - Does java serialization for the Properties object allow you to read just the version field? Basically, if we want to upgrade the version of this file, for some time the server would have to support 2 versions and read the file differently based on the different versions. Sriharsha Chintalapani wrote: I am not sure if I understand correctly. BrokerMetadataCheckpoint.read() does this. val brokerMetaProps = new VerifiableProperties(Utils.loadProps(file.getAbsolutePath())) val version = brokerMetaProps.getIntInRange(version, (0, Int.MaxValue)) version match { case 0 = . It does load the entire contents into brokerMetaProps and we can pick the version from it. If the intention is to just read the single line to get the version its not possible with the current approach. Any specific reason that we need just the version. In either case we will be able to support multiple version files at the same. Since the file contents are like key=value pairs. It does load the entire contents into brokerMetaProps and we can pick the version from it. You are right. This should be sufficient. The broker can be smart about interpreting the rest of the fields based on the read version. - Neha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review67551 --- On Jan. 12, 2015, 6:46 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Jan. 12, 2015, 6:46 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1691ad7fc80ca0b112f68e3ea0cbab265c75b26b core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala c9e8ba257b77f46c5c9b62b451470348b6e58889 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Compatibility + Unknown APIs
Yeah I agree there isn't a good way to auto-detect the presence of a given API. I think #1 may be tricky in practice. The response format is always dictated by the request format so how do I know if the bytes I got back are a valid response to the given request or are the UnknownRequestResponse? #2 would be a good fix for the problem I think. This might be a good replacement for the echo api and would probably serve the same purpose (checking if the server is alive). #3 is a little dangerous because we actually want clients to only pay attention to the protocol versions which are per-api, not the server version. I.e. we actually don't want the client to do something like check serverVersion.equals(0.8.2) because we want to be able to release the server at will and have it keep answering protocols in a backwards compatible way. I.e. a client that uses just metadata request and produce request should only care about the version of these protocols it implements being supported not about the version of the server or the version of any other protocol it doesn't use. This is the rationale behind versioning the apis independently rather than having a single protocol version that we would have to bump every time an internal broker-broker protocol changed. -Jay On Mon, Jan 12, 2015 at 1:32 PM, Gwen Shapira gshap...@cloudera.com wrote: We ran into similar difficulties, both when trying to get Kafka to use new APIs when available and when testing the wire protocol. +1 for all three suggestions. #1 sounds like the bare minimum, but I'm not sure how much it will complicate the clients (now we expect either a response or an Unknown message and need to be able to distinguish between them from the byte array). #2 and #3 both makes lots of sense. Gwen On Mon, Jan 12, 2015 at 1:15 PM, Dana Powers dana.pow...@rd.io wrote: Hi all -- continuing on the compatibility discussion: I've found that it is very difficult to identify when a server does not recognize an api (I'm using kafka-python to submit wire-protocol requests). For example, when I send a ConsumerMetadataRequest to an 0.8.1.1 server, I get a closed socket *[stacktrace below]. The server raises an error internally, but does not send any meaningful response. I'm not sure whether this is the intended behavior, but maintaining clients in an ecosystem of multiple server versions with different API support it would be great to have a way to determine what the server supports and what it does not. Some suggestions: (1) An UnknownAPIResponse that is returned for any API or API Version request that is unsupported. (2) A server metadata API to get the list of supported APIs and/or API versions supported. (3) A server metadata API to get the published version of the server (0.8.2 v. 0.8.1.1, etc). Thoughts? Dana Powers Rdio, Inc. dana.pow...@rd.io rdio.com/people/dpkp/ *stacktrace: ``` [2015-01-12 13:03:55,719] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 10 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:57) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:722) ```
Re: Compatibility + Unknown APIs
I think #1 may be tricky in practice. The response format is always dictated by the request format so how do I know if the bytes I got back are a valid response to the given request or are the UnknownRequestResponse? On the other hand, from the client developer perspective, having to figure out that you are looking at a closed socket because you tried to use an API that wasn't implemented in a specific version can be pretty annoying. Another way to do it is to move error_code field (currently implemented in pretty much every single Response schema) to the Response Header, and then we could use it for meta errors such as UnknownAPI. Its a much bigger change than adding a new Request type, but possibly worth it? #2 would be a good fix for the problem I think. This might be a good replacement for the echo api and would probably serve the same purpose (checking if the server is alive). #3 is a little dangerous because we actually want clients to only pay attention to the protocol versions which are per-api, not the server version. I.e. we actually don't want the client to do something like check serverVersion.equals(0.8.2) because we want to be able to release the server at will and have it keep answering protocols in a backwards compatible way. I.e. a client that uses just metadata request and produce request should only care about the version of these protocols it implements being supported not about the version of the server or the version of any other protocol it doesn't use. This is the rationale behind versioning the apis independently rather than having a single protocol version that we would have to bump every time an internal broker-broker protocol changed. -Jay On Mon, Jan 12, 2015 at 1:32 PM, Gwen Shapira gshap...@cloudera.com wrote: We ran into similar difficulties, both when trying to get Kafka to use new APIs when available and when testing the wire protocol. +1 for all three suggestions. #1 sounds like the bare minimum, but I'm not sure how much it will complicate the clients (now we expect either a response or an Unknown message and need to be able to distinguish between them from the byte array). #2 and #3 both makes lots of sense. Gwen On Mon, Jan 12, 2015 at 1:15 PM, Dana Powers dana.pow...@rd.io wrote: Hi all -- continuing on the compatibility discussion: I've found that it is very difficult to identify when a server does not recognize an api (I'm using kafka-python to submit wire-protocol requests). For example, when I send a ConsumerMetadataRequest to an 0.8.1.1 server, I get a closed socket *[stacktrace below]. The server raises an error internally, but does not send any meaningful response. I'm not sure whether this is the intended behavior, but maintaining clients in an ecosystem of multiple server versions with different API support it would be great to have a way to determine what the server supports and what it does not. Some suggestions: (1) An UnknownAPIResponse that is returned for any API or API Version request that is unsupported. (2) A server metadata API to get the list of supported APIs and/or API versions supported. (3) A server metadata API to get the published version of the server (0.8.2 v. 0.8.1.1, etc). Thoughts? Dana Powers Rdio, Inc. dana.pow...@rd.io rdio.com/people/dpkp/ *stacktrace: ``` [2015-01-12 13:03:55,719] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 10 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:57) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:722) ```
[jira] [Updated] (KAFKA-1841) OffsetCommitRequest API - timestamp field is not versioned
[ https://issues.apache.org/jira/browse/KAFKA-1841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1841: --- Attachment: kafka-1841_2015-01-12_14:30:24.patch OffsetCommitRequest API - timestamp field is not versioned -- Key: KAFKA-1841 URL: https://issues.apache.org/jira/browse/KAFKA-1841 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Environment: wire-protocol Reporter: Dana Powers Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: kafka-1841.patch, kafka-1841_2015-01-08_15:07:57.patch, kafka-1841_2015-01-09_14:36:50.patch, kafka-1841_2015-01-12_14:30:24.patch Timestamp field was added to the OffsetCommitRequest wire protocol api for 0.8.2 by KAFKA-1012 . The 0.8.1.1 server does not support the timestamp field, so I think the api version of OffsetCommitRequest should be incremented and checked by the 0.8.2 kafka server before attempting to read a timestamp from the network buffer in OffsetCommitRequest.readFrom (core/src/main/scala/kafka/api/OffsetCommitRequest.scala) It looks like a subsequent patch (KAFKA-1462) added another api change to support a new constructor w/ params generationId and consumerId, calling that version 1, and a pending patch (KAFKA-1634) adds retentionMs as another field, while possibly removing timestamp altogether, calling this version 2. So the fix here is not straightforward enough for me to submit a patch. This could possibly be merged into KAFKA-1634, but opening as a separate Issue because I believe the lack of versioning in the current trunk should block 0.8.2 release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1841) OffsetCommitRequest API - timestamp field is not versioned
[ https://issues.apache.org/jira/browse/KAFKA-1841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274301#comment-14274301 ] Jun Rao commented on KAFKA-1841: Updated reviewboard https://reviews.apache.org/r/29692/diff/ against branch origin/0.8.2 OffsetCommitRequest API - timestamp field is not versioned -- Key: KAFKA-1841 URL: https://issues.apache.org/jira/browse/KAFKA-1841 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Environment: wire-protocol Reporter: Dana Powers Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: kafka-1841.patch, kafka-1841_2015-01-08_15:07:57.patch, kafka-1841_2015-01-09_14:36:50.patch, kafka-1841_2015-01-12_14:30:24.patch Timestamp field was added to the OffsetCommitRequest wire protocol api for 0.8.2 by KAFKA-1012 . The 0.8.1.1 server does not support the timestamp field, so I think the api version of OffsetCommitRequest should be incremented and checked by the 0.8.2 kafka server before attempting to read a timestamp from the network buffer in OffsetCommitRequest.readFrom (core/src/main/scala/kafka/api/OffsetCommitRequest.scala) It looks like a subsequent patch (KAFKA-1462) added another api change to support a new constructor w/ params generationId and consumerId, calling that version 1, and a pending patch (KAFKA-1634) adds retentionMs as another field, while possibly removing timestamp altogether, calling this version 2. So the fix here is not straightforward enough for me to submit a patch. This could possibly be merged into KAFKA-1634, but opening as a separate Issue because I believe the lack of versioning in the current trunk should block 0.8.2 release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29692: Patch for kafka-1841
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29692/ --- (Updated Jan. 12, 2015, 10:30 p.m.) Review request for kafka. Bugs: kafka-1841 https://issues.apache.org/jira/browse/kafka-1841 Repository: kafka Description (updated) --- addressing Joel's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/OffsetCommitResponse.scala 624a1c1cc540688ae2b1fb96665696a6084158e5 core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/server/KafkaApis.scala 9a61fcba3b5eeb295676b3ef720c719ef5244642 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 Diff: https://reviews.apache.org/r/29692/diff/ Testing --- Thanks, Jun Rao
[jira] [Updated] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paul Pearcy updated KAFKA-1835: --- Affects Version/s: 0.9.0 0.8.3 Status: Patch Available (was: Open) Here is a first pass at adding options to ensure the kafka producer never blocks. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2, 0.8.3, 0.9.0 Reporter: Paul Pearcy Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paul Pearcy updated KAFKA-1835: --- Attachment: KAFKA-1835-New-producer--blocking_v0.patch Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2, 0.8.3, 0.9.0 Reporter: Paul Pearcy Attachments: KAFKA-1835-New-producer--blocking_v0.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29692: Patch for kafka-1841
On Jan. 12, 2015, 7:47 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 88 https://reviews.apache.org/r/29692/diff/3/?file=815275#file815275line88 We should probably also change OffsetCommitRequest.responseFor . The issue is that if you get an UnknownTopicOrPartition error right now we convert that to a ConsumerCoordinatorNotAvailableCode which does not apply for v0. (BTW, if this patch were for trunk you would not need to do this since latest trunk sets the code correctly in the OffsetManager class) Alternatively, we could just remove the check here but that would be a change in behavior. This doesn't seem to be necessary since in v0, we handle UnknownTopicOrPartition explicitly in handleOffsetCommitRequest() by converting the exception to the right error code. On Jan. 12, 2015, 7:47 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 101 https://reviews.apache.org/r/29692/diff/3/?file=815275#file815275line101 Should we explicitly version the scala side of OffsetCommitResponse as well especially given that the Java version has a v0/v1? E.g., if a client proactively checks for the response version... This seems to always send back version = 0 in the response Added a comment to indicate that this constructor is for both v0 and v1. - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29692/#review67667 --- On Jan. 12, 2015, 10:30 p.m., Jun Rao wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29692/ --- (Updated Jan. 12, 2015, 10:30 p.m.) Review request for kafka. Bugs: kafka-1841 https://issues.apache.org/jira/browse/kafka-1841 Repository: kafka Description --- addressing Joel's comments Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/OffsetCommitResponse.scala 624a1c1cc540688ae2b1fb96665696a6084158e5 core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/server/KafkaApis.scala 9a61fcba3b5eeb295676b3ef720c719ef5244642 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 Diff: https://reviews.apache.org/r/29692/diff/ Testing --- Thanks, Jun Rao
[jira] [Updated] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1835: - Fix Version/s: 0.8.2 Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.2 Attachments: KAFKA-1835-New-producer--blocking_v0.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29210: Patch for KAFKA-1819
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/#review67717 --- Minor comments core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala https://reviews.apache.org/r/29210/#comment111813 unused core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala https://reviews.apache.org/r/29210/#comment111807 Rather than an arbitrary sleep can we just use waitUntilTrue and the condition would be that the cleaner checkpoint file contains the topic. core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala https://reviews.apache.org/r/29210/#comment111809 Actually, now that you have the check above in DeleteTopicTest, do you think it is necessary to have this? - Joel Koshy On Jan. 12, 2015, 7:17 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/ --- (Updated Jan. 12, 2015, 7:17 p.m.) Review request for kafka. Bugs: KAFKA-1819 https://issues.apache.org/jira/browse/KAFKA-1819 Repository: kafka Description --- added locking improved tests per Joel and Neha's suggestions added cleaner test to DeleteTopicTest Fixes to DeleteTopicTest: clean up servers after cleaner test and move cleaner verification to the validation function Diffs - core/src/main/scala/kafka/log/LogCleaner.scala f8fcb843c80eec3cf3c931df6bb472c019305253 core/src/main/scala/kafka/log/LogCleanerManager.scala bcfef77ed53f94017c06a884e4db14531774a0a2 core/src/main/scala/kafka/log/LogManager.scala 4ebaae00ca4b80bf15c7930bae2011d98bbec053 core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 Diff: https://reviews.apache.org/r/29210/diff/ Testing --- Thanks, Gwen Shapira
Re: Compatibility + Unknown APIs
Yeah I totally agree--using the closed socket to indicate not supported does work since any network error could lead to that. Arguably we should have a request level error. We discussed this at the time we were defining the protocols for 0.8 and the conclusion was not to do that. The reasoning was that since almost all the requests end up having errors at either a per-topic or per-partition level this makes correctly setting/interpreting the global error a bit confusing. I.e. if you are implementing a client and a given partition gets an error but there is no global error, what do you do? Likewise in most cases it is a bit ambiguous how to set the global error on the server side (i.e. if some partitions are unavailable but some are available). The result was that error reporting is defined per-request. We could change this now, but it would mean bumping compatibility on all the apis to add the new field which would be annoying to people, right? I actually agree it might have been better to do it this way both for this and also to make generic error handling easier but I'm not sure if it is worth such a big break now. The other proposal, introducing a get_protocol_versions() method seems almost as good for probing for support and is much less invasive. That seems better to me because I think generally clients shouldn't need to do this, they should just build against a minimum Kafka version and trust it will keep working into the future. -Jay On Mon, Jan 12, 2015 at 2:24 PM, Gwen Shapira gshap...@cloudera.com wrote: I think #1 may be tricky in practice. The response format is always dictated by the request format so how do I know if the bytes I got back are a valid response to the given request or are the UnknownRequestResponse? On the other hand, from the client developer perspective, having to figure out that you are looking at a closed socket because you tried to use an API that wasn't implemented in a specific version can be pretty annoying. Another way to do it is to move error_code field (currently implemented in pretty much every single Response schema) to the Response Header, and then we could use it for meta errors such as UnknownAPI. Its a much bigger change than adding a new Request type, but possibly worth it? #2 would be a good fix for the problem I think. This might be a good replacement for the echo api and would probably serve the same purpose (checking if the server is alive). #3 is a little dangerous because we actually want clients to only pay attention to the protocol versions which are per-api, not the server version. I.e. we actually don't want the client to do something like check serverVersion.equals(0.8.2) because we want to be able to release the server at will and have it keep answering protocols in a backwards compatible way. I.e. a client that uses just metadata request and produce request should only care about the version of these protocols it implements being supported not about the version of the server or the version of any other protocol it doesn't use. This is the rationale behind versioning the apis independently rather than having a single protocol version that we would have to bump every time an internal broker-broker protocol changed. -Jay On Mon, Jan 12, 2015 at 1:32 PM, Gwen Shapira gshap...@cloudera.com wrote: We ran into similar difficulties, both when trying to get Kafka to use new APIs when available and when testing the wire protocol. +1 for all three suggestions. #1 sounds like the bare minimum, but I'm not sure how much it will complicate the clients (now we expect either a response or an Unknown message and need to be able to distinguish between them from the byte array). #2 and #3 both makes lots of sense. Gwen On Mon, Jan 12, 2015 at 1:15 PM, Dana Powers dana.pow...@rd.io wrote: Hi all -- continuing on the compatibility discussion: I've found that it is very difficult to identify when a server does not recognize an api (I'm using kafka-python to submit wire-protocol requests). For example, when I send a ConsumerMetadataRequest to an 0.8.1.1 server, I get a closed socket *[stacktrace below]. The server raises an error internally, but does not send any meaningful response. I'm not sure whether this is the intended behavior, but maintaining clients in an ecosystem of multiple server versions with different API support it would be great to have a way to determine what the server supports and what it does not. Some suggestions: (1) An UnknownAPIResponse that is returned for any API or API Version request that is unsupported. (2) A server metadata API to get the list of supported APIs and/or API versions supported. (3) A server metadata API to get the published version of the server (0.8.2 v. 0.8.1.1, etc). Thoughts? Dana Powers Rdio, Inc.
Re: Compatibility + Unknown APIs
There are benefits of moving the error code to the response header. 1) I think it is the right thing to-do from an implementation perspective. It makes the most sense. You send a request and you get back a response. The response tells you something is wrong in the header. 2) With such a large change we can make sure we have our solution to solve these issues (see other thread on Compatibility and KIP) setup and in place moving forward. If we can make such a large change then smaller ones should work well too. We could even use this one change as a way to best flush out the way we want to implement it preserving functionality AND adding the new response format. When we release 0.8.3 (assuming this was in there) developers can read KIP-1 (or whatever) and decide if they want to support the version bump required, if not then fine keep working with 0.8.2 and you are good to go. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Jan 12, 2015 at 8:37 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah, adding it to the metadata request probably makes sense. What you describe of making it a per-broker field is technically correct, since each broker could be on a different software version. But I wonder if it might not be more usable to just give back a single list of api versions. This will be more compact and also easier to interpret as a client. An easy implementation of this would be for the broker that answers the metadata request by just giving whatever versions it supports. A slightly better implementation would be for each broker to register what it supports in ZK and have the responding broker give back the intersection (i.e. apis supported by all brokers). Since the broker actually supports multiple versions at the same time this will need to be in the form [ApiId [ApiVersion]]. -Jay On Mon, Jan 12, 2015 at 5:19 PM, Dana Powers dana.pow...@rd.io wrote: Perhaps a bit hacky, but you could also reserve a specific correlationId (maybe -1) to represent errors and send back to the client an UnknownAPIResponse like: Response = -1 UnknownAPIResponse UnknownAPIResponse = originalCorrelationId errorCode The benefit here would be that it does not break the current API and current clients should be able to continue operating as usual as long as they ignore unknown correlationIds and don't use the reserved Id. For clients that want to catch unknownAPI errors, they can handle -1 correlationIds and dispatch as needed. Otherwise perhaps bump the Metadata Request / Response API and include the supported api / versions in the Broker metadata: Broker = NodeId Host Port [ApiKey ApiVersion] (any number of brokers may be returned) NodeId = int32 Host = string Port = int32 ApiKey = int16 ApiVersion = int16 So that Metadata includes the list of all supported API/Versions for each broker in the cluster. And echo the problem with continuing with the current behavior pointed out by Jay: clients cannot know the difference between a network error and an unknown API error. And because network errors require a lot of state resets, that can be a big performance hit. Generally on a network error a client needs to assume the worst and reload cluster metadata at least. And it is difficult to prevent this happening every time because the client doesn't know whether to avoid the API in the future because it is not supported, or keep retrying because the network is flaking. Dana Powers Rdio, Inc. dana.pow...@rd.io rdio.com/people/dpkp/ On Mon, Jan 12, 2015 at 3:51 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I totally agree--using the closed socket to indicate not supported does work since any network error could lead to that. Arguably we should have a request level error. We discussed this at the time we were defining the protocols for 0.8 and the conclusion was not to do that. The reasoning was that since almost all the requests end up having errors at either a per-topic or per-partition level this makes correctly setting/interpreting the global error a bit confusing. I.e. if you are implementing a client and a given partition gets an error but there is no global error, what do you do? Likewise in most cases it is a bit ambiguous how to set the global error on the server side (i.e. if some partitions are unavailable but some are available). The result was that error reporting is defined per-request. We could change this now, but it would mean bumping compatibility on all the apis to add the new field which would be annoying to people, right? I actually agree it might have been better to do it this way both
Re: Compatibility + Unknown APIs
I totally agree but I still think we shouldn't do it. :-) That change would cause the reimplementation of ALL existing Kafka clients. (You can't chose not to implement a new protocol version or else we are committing to keeping the old version supported both ways on the server forever). The problem it fixes is fairly minor: clients that want to adaptively detect apis. In general I agree this isn't easy to do, but I also don't think it is really recommended. I think it is probably better for clients to just implement against reasonably conservative versions and trust us not to break them going forward. That is simpler and less likely to break. We also haven't actually addressed the issue originally brought up that lead to not doing it--how to interpret and set the top-level error in the presence of nested errors (which exception does the client throw and when). This is kind of icky to, though probably preferable if we were starting over. I see either of these alternatives as imperfect but changing now has a high cost and doesn't really address a top 50 pain point. But I do agree that KIPs would really help draw attention to these kinds of decisions as we make them and help us get serious about sticking with them without having that kind of it sucks but... feeling. -Jay On Mon, Jan 12, 2015 at 5:57 PM, Joe Stein joe.st...@stealth.ly wrote: There are benefits of moving the error code to the response header. 1) I think it is the right thing to-do from an implementation perspective. It makes the most sense. You send a request and you get back a response. The response tells you something is wrong in the header. 2) With such a large change we can make sure we have our solution to solve these issues (see other thread on Compatibility and KIP) setup and in place moving forward. If we can make such a large change then smaller ones should work well too. We could even use this one change as a way to best flush out the way we want to implement it preserving functionality AND adding the new response format. When we release 0.8.3 (assuming this was in there) developers can read KIP-1 (or whatever) and decide if they want to support the version bump required, if not then fine keep working with 0.8.2 and you are good to go. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Jan 12, 2015 at 8:37 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah, adding it to the metadata request probably makes sense. What you describe of making it a per-broker field is technically correct, since each broker could be on a different software version. But I wonder if it might not be more usable to just give back a single list of api versions. This will be more compact and also easier to interpret as a client. An easy implementation of this would be for the broker that answers the metadata request by just giving whatever versions it supports. A slightly better implementation would be for each broker to register what it supports in ZK and have the responding broker give back the intersection (i.e. apis supported by all brokers). Since the broker actually supports multiple versions at the same time this will need to be in the form [ApiId [ApiVersion]]. -Jay On Mon, Jan 12, 2015 at 5:19 PM, Dana Powers dana.pow...@rd.io wrote: Perhaps a bit hacky, but you could also reserve a specific correlationId (maybe -1) to represent errors and send back to the client an UnknownAPIResponse like: Response = -1 UnknownAPIResponse UnknownAPIResponse = originalCorrelationId errorCode The benefit here would be that it does not break the current API and current clients should be able to continue operating as usual as long as they ignore unknown correlationIds and don't use the reserved Id. For clients that want to catch unknownAPI errors, they can handle -1 correlationIds and dispatch as needed. Otherwise perhaps bump the Metadata Request / Response API and include the supported api / versions in the Broker metadata: Broker = NodeId Host Port [ApiKey ApiVersion] (any number of brokers may be returned) NodeId = int32 Host = string Port = int32 ApiKey = int16 ApiVersion = int16 So that Metadata includes the list of all supported API/Versions for each broker in the cluster. And echo the problem with continuing with the current behavior pointed out by Jay: clients cannot know the difference between a network error and an unknown API error. And because network errors require a lot of state resets, that can be a big performance hit. Generally on a network error a client needs to assume the worst and reload cluster metadata at least.
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-1476: Attachment: KAFKA-1476.patch Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: Balaji Seshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 29831: Patch for KAFKA-1476
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/ --- Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- Merged in work for KAFKA-1476 and sub-task KAFKA-1826 Diffs - bin/kafka-consumer-groups.sh PRE-CREATION core/src/main/scala/kafka/admin/AdminUtils.scala 28b12c7b89a56c113b665fbde1b95f873f8624a3 core/src/main/scala/kafka/tools/ConsumerGroupCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala c14bd455b6642f5e6eb254670bef9f57ae41d6cb core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala ac15d34425795d5be20c51b01fa1108bdcd66583 Diff: https://reviews.apache.org/r/29831/diff/ Testing --- Thanks, Onur Karaman
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274460#comment-14274460 ] Onur Karaman commented on KAFKA-1476: - Created reviewboard https://reviews.apache.org/r/29831/diff/ against branch origin/trunk Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: Balaji Seshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Compatability and KIPs
+1 on version evolving with any protocol / data format / functionality changes, and I am wondering if we have a standard process of deprecating old versions? Today with just a couple of versions for the protocol (e.g. offset commit) the code on the server side is already pretty nested and complicated in order to support different version supports. On Mon, Jan 12, 2015 at 9:21 AM, Jay Kreps j...@confluent.io wrote: Hey Jun, Good points. I totally agree that the versioning needs to cover both format and behavior if the behavior change is incompatible. I kind of agree about the stable/unstable stuff. What I think this means is not that we would ever evolve the protocol without changing the version, but rather that we would drop support for older versions quicker. On one hand that makes sense and it is often a high bar to get things right the first time. On the other hand I think in practice the set of people who interact with the protocol is often different from the end user. So the end-user experience may still be hey my code just broke because some client they use relied on an unstable protocol unbeknownst to them. But I think all that means is that we should be thoughtful about removing support for old protocol versions even if they were marked unstable. Does anyone else have feedback or thoughts on the KIP stuff? Objections? Thoughts on structure? -Jay On Mon, Jan 12, 2015 at 8:20 AM, Jun Rao j...@confluent.io wrote: Jay, Thanks for bringing this up. Yes, we should increase the level of awareness of compatibility. For 1 and 2, they probably should include any functional change. For example, even if there is no change in the binary data format, but the interpretation is changed, we should consider this as a binary format change and bump up the version number. 3. Having a wider discussion on api/protocol/data changes in the mailing list seems like a good idea. 7. It might be good to also document api/protocol/data format that are considered stable (or unstable). For example, in 0.8.2 release, we will have a few new protocols (e.g. HeartBeat) for the development of the new consumer. Those new protocols probably shouldn't be considered stable until the new consumer is more fully developed. Thanks, Jun On Fri, Jan 9, 2015 at 4:29 PM, Jay Kreps j...@confluent.io wrote: Hey guys, We had a bit of a compatibility slip-up in 0.8.2 with the offset commit stuff. We caught this one before the final release so it's not too bad. But I do think it kind of points to an area we could do better. One piece of feedback we have gotten from going out and talking to users is that compatibility is really, really important to them. Kafka is getting deployed in big environments where the clients are embedded in lots of applications and any kind of incompatibility is a huge pain for people using it and generally makes upgrade difficult or impossible. In practice what I think this means for development is a lot more pressure to really think about the public interfaces we are making and try our best to get them right. This can be hard sometimes as changes come in patches and it is hard to follow every single rb with enough diligence to know. Compatibility really means a couple things: 1. Protocol changes 2. Binary data format changes 3. Changes in public apis in the clients 4. Configs 5. Metric names 6. Command line tools I think 1-2 are critical. 3 is very important. And 4, 5 and 6 are pretty important but not critical. One thing this implies is that we are really going to have to do a good job of thinking about apis and use cases. You can definitely see a number of places in the old clients and in a couple of the protocols where enough care was not given to thinking things through. Some of those were from long long ago, but we should really try to avoid adding to that set because increasingly we will have to carry around these mistakes for a long time. Here are a few things I thought we could do that might help us get better in this area: 1. Technically we are just in a really bad place with the protocol because it is defined twice--once in the old scala request objects, and once in the new protocol format for the clients. This makes changes massively painful. The good news is that the new request definition DSL was intended to make adding new protocol versions a lot easier and clearer. It will also make it a lot more obvious when the protocol is changed since you will be checking in or reviewing a change to Protocol.java. Getting the server moved over to the new request objects and protocol definition will be a bit of a slog but it will really help here I think. 2. We need to get some testing in place on cross-version compatibility. This is work and no tests here
Re: [DISCUSS] Compatability and KIPs
Yeah I think this makes sense. Some of the crazy nesting will get better when we move to the new protocol definition I think, but we will always need some kind of if statement that branches for the different behavior and this makes testing difficult. Probably the best thing to do would be to announce a version deprecated which will have no function but will serve as a warning that it is going away and then remove it some time later. This would mean including something that notes this in the protocol docs and maybe the release notes. We should probably just always do this for all but the latest version of all apis. I think probably a year of deprecation should be sufficient prior to removal? I also think we can maybe use some common sense in deciding this. Removing older versions will always be bad for users and client developers and always be good for Kafka committers. I think we can be more aggressive on things that are not heavily used (and hence less bad for users) or for which supporting multiple versions is particularly onerous. -Jay On Mon, Jan 12, 2015 at 5:02 PM, Guozhang Wang wangg...@gmail.com wrote: +1 on version evolving with any protocol / data format / functionality changes, and I am wondering if we have a standard process of deprecating old versions? Today with just a couple of versions for the protocol (e.g. offset commit) the code on the server side is already pretty nested and complicated in order to support different version supports. On Mon, Jan 12, 2015 at 9:21 AM, Jay Kreps j...@confluent.io wrote: Hey Jun, Good points. I totally agree that the versioning needs to cover both format and behavior if the behavior change is incompatible. I kind of agree about the stable/unstable stuff. What I think this means is not that we would ever evolve the protocol without changing the version, but rather that we would drop support for older versions quicker. On one hand that makes sense and it is often a high bar to get things right the first time. On the other hand I think in practice the set of people who interact with the protocol is often different from the end user. So the end-user experience may still be hey my code just broke because some client they use relied on an unstable protocol unbeknownst to them. But I think all that means is that we should be thoughtful about removing support for old protocol versions even if they were marked unstable. Does anyone else have feedback or thoughts on the KIP stuff? Objections? Thoughts on structure? -Jay On Mon, Jan 12, 2015 at 8:20 AM, Jun Rao j...@confluent.io wrote: Jay, Thanks for bringing this up. Yes, we should increase the level of awareness of compatibility. For 1 and 2, they probably should include any functional change. For example, even if there is no change in the binary data format, but the interpretation is changed, we should consider this as a binary format change and bump up the version number. 3. Having a wider discussion on api/protocol/data changes in the mailing list seems like a good idea. 7. It might be good to also document api/protocol/data format that are considered stable (or unstable). For example, in 0.8.2 release, we will have a few new protocols (e.g. HeartBeat) for the development of the new consumer. Those new protocols probably shouldn't be considered stable until the new consumer is more fully developed. Thanks, Jun On Fri, Jan 9, 2015 at 4:29 PM, Jay Kreps j...@confluent.io wrote: Hey guys, We had a bit of a compatibility slip-up in 0.8.2 with the offset commit stuff. We caught this one before the final release so it's not too bad. But I do think it kind of points to an area we could do better. One piece of feedback we have gotten from going out and talking to users is that compatibility is really, really important to them. Kafka is getting deployed in big environments where the clients are embedded in lots of applications and any kind of incompatibility is a huge pain for people using it and generally makes upgrade difficult or impossible. In practice what I think this means for development is a lot more pressure to really think about the public interfaces we are making and try our best to get them right. This can be hard sometimes as changes come in patches and it is hard to follow every single rb with enough diligence to know. Compatibility really means a couple things: 1. Protocol changes 2. Binary data format changes 3. Changes in public apis in the clients 4. Configs 5. Metric names 6. Command line tools I think 1-2 are critical. 3 is very important. And 4, 5 and 6 are pretty important but not critical. One thing this implies is that we are really going to have to do a good job of thinking about apis and use
[jira] [Commented] (KAFKA-1857) Kafka Broker ids are removed ( with zookeeper , Storm )
[ https://issues.apache.org/jira/browse/KAFKA-1857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274595#comment-14274595 ] Yoonhyeok Kim commented on KAFKA-1857: -- [~nehanarkhede] Okay, Thank you. But I don't know how to send it mailing list. What is the adresss of it, and is there special way or process to do that? Kafka Broker ids are removed ( with zookeeper , Storm ) --- Key: KAFKA-1857 URL: https://issues.apache.org/jira/browse/KAFKA-1857 Project: Kafka Issue Type: Bug Components: consumer, controller Affects Versions: 0.8.1 Environment: Ubuntu , With Storm-kafka and zookeeeper 3.4.6 Reporter: Yoonhyeok Kim Hi, I am using kind of Real-time analytics system with zookeeper, Storm Kafka. -versions Storm : storm-corer-0.9.2 Kafka 0.8.1 (3 brokers) storm-kafka : 0.9.2 zookeeper 3.4.6 (standalone) But this problem occurs when I use pre-versions as well. - exceptions EndOfStreamException, java.nio.channels.CancelledKeyException, org.apache.zookeeper.KeeperException$BadVersionException --- When I use kafka spout with storm , sometimes there was zookeeper logs like (zookeeper.out) {code} 2015-01-10 19:19:00,836 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b0658, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) {code} still, zookeeper is working well, and storm-kafka looks fine , transfers data rightly. But as time goes by, those kind of Error keep occurs and then I saw different logs like... {code} 2015-01-10 23:22:11,022 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:48504 which had sessionid 0x14ab82c142b0644 2015-01-10 23:22:11,023 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b001d, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) 2015-01-10 23:22:11,023 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:55885 which had sessionid 0x14ab82c142b001d 2015-01-10 23:22:11,023 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b063e, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) 2015-01-10 23:22:11,026 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:48444 which had sessionid 0x14ab82c142b063e 2015-01-10 23:22:11,026 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b0639, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) 2015-01-10 23:22:11,027 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:56724 which had sessionid 0x14ab82c142b0658 2015-01-10 23:22:11,431 [myid:] - ERROR [SyncThread:0:NIOServerCnxn@178] - Unexpected Exception: java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:55) at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:59) at org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:151) at org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1081) at org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:170) at
Review Request 29840: Patch for KAFKA-1818
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29840/ --- Review request for kafka. Bugs: KAFKA-1818 https://issues.apache.org/jira/browse/KAFKA-1818 Repository: kafka Description --- KAFKA-1818 clean up code to more idiomatic scala usage Diffs - core/src/main/scala/kafka/utils/ReplicationUtils.scala 715767380f7c284148689fd34d4bfba51abd96a0 core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala 84e08557de5acdcf0a98b192feac72836ea359b8 Diff: https://reviews.apache.org/r/29840/diff/ Testing --- Thanks, Eric Olander
Build failed in Jenkins: Kafka-trunk #364
See https://builds.apache.org/job/Kafka-trunk/364/changes Changes: [neha.narkhede] KAFKA-1070 Auto assign broker id; reviewed by Neha Narkhede [neha.narkhede] KAFKA-1836 metadata.fetch.timeout.ms set to zero blocks forever; reviewed by Neha Narkhede and Ewen Cheslack-Postava -- [...truncated 1444 lines...] kafka.utils.UtilsTest testSwallow PASSED kafka.utils.UtilsTest testCircularIterator PASSED kafka.utils.UtilsTest testReadBytes PASSED kafka.utils.UtilsTest testAbs PASSED kafka.utils.UtilsTest testReplaceSuffix PASSED kafka.utils.UtilsTest testReadInt PASSED kafka.utils.UtilsTest testCsvList PASSED kafka.utils.UtilsTest testCsvMap PASSED kafka.utils.UtilsTest testInLock PASSED kafka.utils.UtilsTest testDoublyLinkedList PASSED kafka.utils.SchedulerTest testMockSchedulerNonPeriodicTask PASSED kafka.utils.SchedulerTest testMockSchedulerPeriodicTask PASSED kafka.utils.SchedulerTest testReentrantTaskInMockScheduler PASSED kafka.utils.SchedulerTest testNonPeriodicTask PASSED kafka.utils.SchedulerTest testPeriodicTask PASSED kafka.utils.JsonTest testJsonEncoding PASSED kafka.utils.ReplicationUtilsTest testUpdateLeaderAndIsr FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.utils.ReplicationUtilsTest.setUp(ReplicationUtilsTest.scala:47) kafka.metrics.KafkaTimerTest testKafkaTimer PASSED kafka.log.FileMessageSetTest testWrittenEqualsRead PASSED kafka.log.FileMessageSetTest testIteratorIsConsistent PASSED kafka.log.FileMessageSetTest testSizeInBytes PASSED kafka.log.FileMessageSetTest testWriteTo PASSED kafka.log.FileMessageSetTest testFileSize PASSED kafka.log.FileMessageSetTest testIterationOverPartialAndTruncation PASSED kafka.log.FileMessageSetTest testIterationDoesntChangePosition PASSED kafka.log.FileMessageSetTest testRead PASSED kafka.log.FileMessageSetTest testSearch PASSED kafka.log.FileMessageSetTest testIteratorWithLimits PASSED kafka.log.FileMessageSetTest testTruncate PASSED kafka.log.LogConfigTest testFromPropsDefaults PASSED kafka.log.LogConfigTest testFromPropsEmpty PASSED kafka.log.LogConfigTest testFromPropsToProps PASSED kafka.log.LogConfigTest testFromPropsInvalid PASSED kafka.log.LogCleanerIntegrationTest cleanerTest PASSED kafka.log.LogManagerTest testCreateLog PASSED kafka.log.LogManagerTest testGetNonExistentLog PASSED kafka.log.LogManagerTest testCleanupExpiredSegments PASSED kafka.log.LogManagerTest testCleanupSegmentsToMaintainSize PASSED kafka.log.LogManagerTest testTimeBasedFlush PASSED kafka.log.LogManagerTest testLeastLoadedAssignment PASSED kafka.log.LogManagerTest testTwoLogManagersUsingSameDirFails PASSED kafka.log.LogManagerTest testCheckpointRecoveryPoints PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithTrailingSlash PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithRelativeDirectory PASSED kafka.log.LogTest testTimeBasedLogRoll PASSED kafka.log.LogTest testTimeBasedLogRollJitter PASSED kafka.log.LogTest testSizeBasedLogRoll PASSED kafka.log.LogTest testLoadEmptyLog PASSED kafka.log.LogTest testAppendAndReadWithSequentialOffsets PASSED kafka.log.LogTest testAppendAndReadWithNonSequentialOffsets PASSED kafka.log.LogTest testReadAtLogGap PASSED kafka.log.LogTest testReadOutOfRange PASSED kafka.log.LogTest testLogRolls PASSED kafka.log.LogTest testCompressedMessages PASSED kafka.log.LogTest testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED kafka.log.LogTest testMessageSetSizeCheck PASSED kafka.log.LogTest testMessageSizeCheck PASSED kafka.log.LogTest testLogRecoversToCorrectOffset PASSED kafka.log.LogTest testIndexRebuild PASSED kafka.log.LogTest testTruncateTo PASSED kafka.log.LogTest testIndexResizingAtTruncation PASSED kafka.log.LogTest testBogusIndexSegmentsAreRemoved PASSED kafka.log.LogTest testReopenThenTruncate PASSED kafka.log.LogTest testAsyncDelete PASSED kafka.log.LogTest testOpenDeletesObsoleteFiles PASSED kafka.log.LogTest testAppendMessageWithNullPayload PASSED kafka.log.LogTest testCorruptLog PASSED kafka.log.LogTest testCleanShutdownFile PASSED kafka.log.LogTest testParseTopicPartitionName PASSED kafka.log.LogTest testParseTopicPartitionNameForEmptyName PASSED kafka.log.LogTest testParseTopicPartitionNameForNull PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingSeparator
Re: Compatibility + Unknown APIs
Perhaps a bit hacky, but you could also reserve a specific correlationId (maybe -1) to represent errors and send back to the client an UnknownAPIResponse like: Response = -1 UnknownAPIResponse UnknownAPIResponse = originalCorrelationId errorCode The benefit here would be that it does not break the current API and current clients should be able to continue operating as usual as long as they ignore unknown correlationIds and don't use the reserved Id. For clients that want to catch unknownAPI errors, they can handle -1 correlationIds and dispatch as needed. Otherwise perhaps bump the Metadata Request / Response API and include the supported api / versions in the Broker metadata: Broker = NodeId Host Port [ApiKey ApiVersion] (any number of brokers may be returned) NodeId = int32 Host = string Port = int32 ApiKey = int16 ApiVersion = int16 So that Metadata includes the list of all supported API/Versions for each broker in the cluster. And echo the problem with continuing with the current behavior pointed out by Jay: clients cannot know the difference between a network error and an unknown API error. And because network errors require a lot of state resets, that can be a big performance hit. Generally on a network error a client needs to assume the worst and reload cluster metadata at least. And it is difficult to prevent this happening every time because the client doesn't know whether to avoid the API in the future because it is not supported, or keep retrying because the network is flaking. Dana Powers Rdio, Inc. dana.pow...@rd.io rdio.com/people/dpkp/ On Mon, Jan 12, 2015 at 3:51 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I totally agree--using the closed socket to indicate not supported does work since any network error could lead to that. Arguably we should have a request level error. We discussed this at the time we were defining the protocols for 0.8 and the conclusion was not to do that. The reasoning was that since almost all the requests end up having errors at either a per-topic or per-partition level this makes correctly setting/interpreting the global error a bit confusing. I.e. if you are implementing a client and a given partition gets an error but there is no global error, what do you do? Likewise in most cases it is a bit ambiguous how to set the global error on the server side (i.e. if some partitions are unavailable but some are available). The result was that error reporting is defined per-request. We could change this now, but it would mean bumping compatibility on all the apis to add the new field which would be annoying to people, right? I actually agree it might have been better to do it this way both for this and also to make generic error handling easier but I'm not sure if it is worth such a big break now. The other proposal, introducing a get_protocol_versions() method seems almost as good for probing for support and is much less invasive. That seems better to me because I think generally clients shouldn't need to do this, they should just build against a minimum Kafka version and trust it will keep working into the future. -Jay On Mon, Jan 12, 2015 at 2:24 PM, Gwen Shapira gshap...@cloudera.com wrote: I think #1 may be tricky in practice. The response format is always dictated by the request format so how do I know if the bytes I got back are a valid response to the given request or are the UnknownRequestResponse? On the other hand, from the client developer perspective, having to figure out that you are looking at a closed socket because you tried to use an API that wasn't implemented in a specific version can be pretty annoying. Another way to do it is to move error_code field (currently implemented in pretty much every single Response schema) to the Response Header, and then we could use it for meta errors such as UnknownAPI. Its a much bigger change than adding a new Request type, but possibly worth it? #2 would be a good fix for the problem I think. This might be a good replacement for the echo api and would probably serve the same purpose (checking if the server is alive). #3 is a little dangerous because we actually want clients to only pay attention to the protocol versions which are per-api, not the server version. I.e. we actually don't want the client to do something like check serverVersion.equals(0.8.2) because we want to be able to release the server at will and have it keep answering protocols in a backwards compatible way. I.e. a client that uses just metadata request and produce request should only care about the version of these protocols it implements being supported not about the version of the server or the version of any other protocol it doesn't use. This is the rationale behind versioning the apis independently rather than having a single protocol version that we would have to bump every time an
Re: Kafka Cluster Monitoring and Documentation of Internals (JMX Metrics) of Rejected Events
Hi, I think you could just email user@? There was no attached image. I think Jun committed something about this: https://issues.apache.org/jira/browse/KAFKA-1481?focusedCommentId=14272057page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14272057 Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Jan 12, 2015 at 5:04 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Team, I am trying to find out Kafka Internal and how a message can be corrupted or lost at brokers side. I have refer to following documentations for monitoring: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals http://kafka.apache.org/documentation.html#monitoring I am looking at following beans: kafka.server:type=BrokerTopicMetrics,name=test-FailedProduceRequestsPerSec kafka.server:type=BrokerTopicMetrics,name=test-BytesRejectedPerSec I see following exception on Brokers side rejecting due to large request. This is great but it does not show the source ip of prodcuer that caused this issue ? Is there any way to log and capture this ? *[2014-10-14 22:09:53,262] ERROR [KafkaApi-2] Error processing ProducerRequest with correlation id 28795280 from client Xon partition [XXX,17] (kafka.server.KafkaApis)kafka.common.MessageSizeTooLargeException: Message size is 2924038 bytes which exceeds the maximum configured message size of 2097152. * Can you this be reported as separate metric MessageSizeTooLargeException per topic ? Also, what is best way to find the CRC check error from the consumer side ? How do you debug this ? e.g log line: *11 Dec 2014 07:22:33,387 ERROR [pool-15-thread-4] * *kafka.message.InvalidMessageException: Message is corrupt (stored crc = 1834644195, computed crc = 2374999037)* Also, is there any jira open to update with list all latest metrics and its format and what it means ? http://kafka.apache.org/documentation.html#monitoring. Please see attached image for list of all metrics. Version of Broker is 0.8.1.1. Thanks, Bhavesh
[jira] [Updated] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1070: - Resolution: Fixed Status: Resolved (was: Patch Available) [~sriharsha] Thanks for your efforts in getting this patch in shape. Pushed to trunk Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Fix For: 0.8.3 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch, KAFKA-1070_2015-01-01_17:39:30.patch, KAFKA-1070_2015-01-12_10:46:54.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-1476: Attachment: KAFKA-1476_2015-01-12_16:31:20.patch Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: Balaji Seshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch, KAFKA-1476_2015-01-12_16:22:26.patch, KAFKA-1476_2015-01-12_16:31:20.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274481#comment-14274481 ] Onur Karaman commented on KAFKA-1476: - Updated reviewboard https://reviews.apache.org/r/29831/diff/ against branch origin/trunk Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: Balaji Seshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch, KAFKA-1476_2015-01-12_16:22:26.patch, KAFKA-1476_2015-01-12_16:31:20.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274530#comment-14274530 ] Gwen Shapira commented on KAFKA-1819: - Updated reviewboard https://reviews.apache.org/r/29210/diff/ against branch trunk Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Gwen Shapira Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1819.patch, KAFKA-1819_2014-12-26_13:58:44.patch, KAFKA-1819_2014-12-30_16:01:19.patch, KAFKA-1819_2015-01-12_10:34:07.patch, KAFKA-1819_2015-01-12_11:17:53.patch, KAFKA-1819_2015-01-12_17:01:53.patch I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29210: Patch for KAFKA-1819
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/ --- (Updated Jan. 13, 2015, 1:01 a.m.) Review request for kafka. Bugs: KAFKA-1819 https://issues.apache.org/jira/browse/KAFKA-1819 Repository: kafka Description (updated) --- added locking improved tests per Joel and Neha's suggestions added cleaner test to DeleteTopicTest Fixes to DeleteTopicTest: clean up servers after cleaner test and move cleaner verification to the validation function minor fixes suggested by Joel Diffs (updated) - core/src/main/scala/kafka/log/LogCleaner.scala f8fcb843c80eec3cf3c931df6bb472c019305253 core/src/main/scala/kafka/log/LogCleanerManager.scala bcfef77ed53f94017c06a884e4db14531774a0a2 core/src/main/scala/kafka/log/LogManager.scala 4ebaae00ca4b80bf15c7930bae2011d98bbec053 core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 Diff: https://reviews.apache.org/r/29210/diff/ Testing --- Thanks, Gwen Shapira
Re: [DISCUSS] Compatability and KIPs
Hey Joe, Yeah I think a lot of those items are limitations in that document that we should definitely fix. The first issue you point out is a serious one: We give the total list of errors but don't list which errors can result from which APIs. This is a big issue because actually no one knows and even if you know the code base, determining that from the code is not trivial (since errors can percolate from lower layers). If you are writing a client, in practice, you just try stuff and handle the errors that you've seen and add some generic catch all for any new errors (which is actually a good forward-compatability practice). But it would be a lot easier if this kind of trial and error wasn't required. Having just done the Java producer and consumer I definitely felt that pain. The second issue I think we kind of tried to address by giving basic usage info for things like metadata requests etc. But I think what you are pointing out is that this just isn't nearly detailed enough. Ideally we should give a lot of guidance on implementation options, optimizations, best practices, etc. I agree with this. Especially as we start to get the new consumer protocols in shape having this is really important for helping people make use of them as there are several apis that work together. I think we could expand this section of the docs a lot. I think it also might be a good idea to move this document out of wiki and into the main docs. This way we can version it with releases. Currently there is no way to tell which api versions are supported in which Kafka version as the document is always the current state of the protocol minus stuff on trunk that isn't released yet. This mostly works since in practice if you are developing a client you should probably target the latest release, but it would be better to be able to tell what was in each release. -Jay On Mon, Jan 12, 2015 at 5:50 PM, Joe Stein joe.st...@stealth.ly wrote: Having an index for every protocol/API change (like https://www.python.org/dev/peps/pep-0257/ ) will be much better than the flat wire protocol doc we have now. It is impossible ( without jumping into code ) right now to know if an error is supported in one version of Kafka vs another or different messages even. Having something that is iterative for each change that is explicit, clear and concise for developers for client development would be wonderful. Some folks just try to keep pace with the wire protocol doc regardless and often develop the wrong functionality expected because functionality is not always part of the protocol but an expectation / extension of the producer and/or consumer layer from the project code. The expected behavior I think is a huge gap between the project and client implementations. When you are a Kafka user you have certain expectations when working with producers and consumers. e.g. if you fail a produced message the expectation is to retry X times with a Y backoff between each try. The wire protocol doc doesn't always expose these features that are expected behaviors and often get missed. Assumptions get made and in clients developed very large features take a while (often seen via production issues) to get found out. I think this problem (which is a big one IMHO) also will be better resolved with the KIP process. Client application developers can look at new features, understand the goals and expectations, develop those goals in the language/system required and support the byte structure(s) for a complete use case. I think child pages from https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol might be a way to go. I only suggest that because people already use that page now and we can keep it as a high level here is what you do and then sub link to the child pages when appropriate. I hate completely abandoning something that is not entirely bad but just missing some updates in different ways. So, maybe something like that or having a committed specific part under git or svn might also make sense also. I am not really opinionated on how we implement as long as we do implement something for these issues. Feature and/or byte changes should bump the version number, +1 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Jan 12, 2015 at 8:27 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I think this makes sense. Some of the crazy nesting will get better when we move to the new protocol definition I think, but we will always need some kind of if statement that branches for the different behavior and this makes testing difficult. Probably the best thing to do would be to announce a version deprecated which will have no function but will serve as a warning that it is
Re: Review Request 29210: Patch for KAFKA-1819
On Jan. 12, 2015, 10:53 p.m., Joel Koshy wrote: core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala, line 68 https://reviews.apache.org/r/29210/diff/5/?file=816890#file816890line68 Actually, now that you have the check above in DeleteTopicTest, do you think it is necessary to have this? Both checks exercise the same code path, so I don't think we need both. Less code to maintain is always better, IMO. Since Neha asked to add the test in DeleteTopicTest, I want to make sure that she's ok with removing it from LogCleanerIntegrationTest. Maybe I'm missing a good reason to have both. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/#review67717 --- On Jan. 12, 2015, 7:17 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/ --- (Updated Jan. 12, 2015, 7:17 p.m.) Review request for kafka. Bugs: KAFKA-1819 https://issues.apache.org/jira/browse/KAFKA-1819 Repository: kafka Description --- added locking improved tests per Joel and Neha's suggestions added cleaner test to DeleteTopicTest Fixes to DeleteTopicTest: clean up servers after cleaner test and move cleaner verification to the validation function Diffs - core/src/main/scala/kafka/log/LogCleaner.scala f8fcb843c80eec3cf3c931df6bb472c019305253 core/src/main/scala/kafka/log/LogCleanerManager.scala bcfef77ed53f94017c06a884e4db14531774a0a2 core/src/main/scala/kafka/log/LogManager.scala 4ebaae00ca4b80bf15c7930bae2011d98bbec053 core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 Diff: https://reviews.apache.org/r/29210/diff/ Testing --- Thanks, Gwen Shapira
Re: Review Request 29210: Patch for KAFKA-1819
On Jan. 12, 2015, 10:53 p.m., Joel Koshy wrote: core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, line 251 https://reviews.apache.org/r/29210/diff/5/?file=816889#file816889line251 Rather than an arbitrary sleep can we just use waitUntilTrue and the condition would be that the cleaner checkpoint file contains the topic. Instead of waitUntilTrue, I'm calling LogCleaner.awaitCleaned, to avoid replicating the logic. This required exposing LogCleaner in LogManager. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/#review67717 --- On Jan. 12, 2015, 7:17 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/ --- (Updated Jan. 12, 2015, 7:17 p.m.) Review request for kafka. Bugs: KAFKA-1819 https://issues.apache.org/jira/browse/KAFKA-1819 Repository: kafka Description --- added locking improved tests per Joel and Neha's suggestions added cleaner test to DeleteTopicTest Fixes to DeleteTopicTest: clean up servers after cleaner test and move cleaner verification to the validation function Diffs - core/src/main/scala/kafka/log/LogCleaner.scala f8fcb843c80eec3cf3c931df6bb472c019305253 core/src/main/scala/kafka/log/LogCleanerManager.scala bcfef77ed53f94017c06a884e4db14531774a0a2 core/src/main/scala/kafka/log/LogManager.scala 4ebaae00ca4b80bf15c7930bae2011d98bbec053 core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 Diff: https://reviews.apache.org/r/29210/diff/ Testing --- Thanks, Gwen Shapira
Re: Compatibility + Unknown APIs
Yeah, adding it to the metadata request probably makes sense. What you describe of making it a per-broker field is technically correct, since each broker could be on a different software version. But I wonder if it might not be more usable to just give back a single list of api versions. This will be more compact and also easier to interpret as a client. An easy implementation of this would be for the broker that answers the metadata request by just giving whatever versions it supports. A slightly better implementation would be for each broker to register what it supports in ZK and have the responding broker give back the intersection (i.e. apis supported by all brokers). Since the broker actually supports multiple versions at the same time this will need to be in the form [ApiId [ApiVersion]]. -Jay On Mon, Jan 12, 2015 at 5:19 PM, Dana Powers dana.pow...@rd.io wrote: Perhaps a bit hacky, but you could also reserve a specific correlationId (maybe -1) to represent errors and send back to the client an UnknownAPIResponse like: Response = -1 UnknownAPIResponse UnknownAPIResponse = originalCorrelationId errorCode The benefit here would be that it does not break the current API and current clients should be able to continue operating as usual as long as they ignore unknown correlationIds and don't use the reserved Id. For clients that want to catch unknownAPI errors, they can handle -1 correlationIds and dispatch as needed. Otherwise perhaps bump the Metadata Request / Response API and include the supported api / versions in the Broker metadata: Broker = NodeId Host Port [ApiKey ApiVersion] (any number of brokers may be returned) NodeId = int32 Host = string Port = int32 ApiKey = int16 ApiVersion = int16 So that Metadata includes the list of all supported API/Versions for each broker in the cluster. And echo the problem with continuing with the current behavior pointed out by Jay: clients cannot know the difference between a network error and an unknown API error. And because network errors require a lot of state resets, that can be a big performance hit. Generally on a network error a client needs to assume the worst and reload cluster metadata at least. And it is difficult to prevent this happening every time because the client doesn't know whether to avoid the API in the future because it is not supported, or keep retrying because the network is flaking. Dana Powers Rdio, Inc. dana.pow...@rd.io rdio.com/people/dpkp/ On Mon, Jan 12, 2015 at 3:51 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I totally agree--using the closed socket to indicate not supported does work since any network error could lead to that. Arguably we should have a request level error. We discussed this at the time we were defining the protocols for 0.8 and the conclusion was not to do that. The reasoning was that since almost all the requests end up having errors at either a per-topic or per-partition level this makes correctly setting/interpreting the global error a bit confusing. I.e. if you are implementing a client and a given partition gets an error but there is no global error, what do you do? Likewise in most cases it is a bit ambiguous how to set the global error on the server side (i.e. if some partitions are unavailable but some are available). The result was that error reporting is defined per-request. We could change this now, but it would mean bumping compatibility on all the apis to add the new field which would be annoying to people, right? I actually agree it might have been better to do it this way both for this and also to make generic error handling easier but I'm not sure if it is worth such a big break now. The other proposal, introducing a get_protocol_versions() method seems almost as good for probing for support and is much less invasive. That seems better to me because I think generally clients shouldn't need to do this, they should just build against a minimum Kafka version and trust it will keep working into the future. -Jay On Mon, Jan 12, 2015 at 2:24 PM, Gwen Shapira gshap...@cloudera.com wrote: I think #1 may be tricky in practice. The response format is always dictated by the request format so how do I know if the bytes I got back are a valid response to the given request or are the UnknownRequestResponse? On the other hand, from the client developer perspective, having to figure out that you are looking at a closed socket because you tried to use an API that wasn't implemented in a specific version can be pretty annoying. Another way to do it is to move error_code field (currently implemented in pretty much every single Response schema) to the Response Header, and then we could use it for meta errors such as UnknownAPI. Its a much bigger change than adding a new Request type, but possibly worth it?
[jira] [Commented] (KAFKA-1857) Kafka Broker ids are removed ( with zookeeper , Storm )
[ https://issues.apache.org/jira/browse/KAFKA-1857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274620#comment-14274620 ] Neha Narkhede commented on KAFKA-1857: -- You might find this useful - http://kafka.apache.org/contact.html Kafka Broker ids are removed ( with zookeeper , Storm ) --- Key: KAFKA-1857 URL: https://issues.apache.org/jira/browse/KAFKA-1857 Project: Kafka Issue Type: Bug Components: consumer, controller Affects Versions: 0.8.1 Environment: Ubuntu , With Storm-kafka and zookeeeper 3.4.6 Reporter: Yoonhyeok Kim Hi, I am using kind of Real-time analytics system with zookeeper, Storm Kafka. -versions Storm : storm-corer-0.9.2 Kafka 0.8.1 (3 brokers) storm-kafka : 0.9.2 zookeeper 3.4.6 (standalone) But this problem occurs when I use pre-versions as well. - exceptions EndOfStreamException, java.nio.channels.CancelledKeyException, org.apache.zookeeper.KeeperException$BadVersionException --- When I use kafka spout with storm , sometimes there was zookeeper logs like (zookeeper.out) {code} 2015-01-10 19:19:00,836 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b0658, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) {code} still, zookeeper is working well, and storm-kafka looks fine , transfers data rightly. But as time goes by, those kind of Error keep occurs and then I saw different logs like... {code} 2015-01-10 23:22:11,022 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:48504 which had sessionid 0x14ab82c142b0644 2015-01-10 23:22:11,023 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b001d, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) 2015-01-10 23:22:11,023 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:55885 which had sessionid 0x14ab82c142b001d 2015-01-10 23:22:11,023 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b063e, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) 2015-01-10 23:22:11,026 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:48444 which had sessionid 0x14ab82c142b063e 2015-01-10 23:22:11,026 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b0639, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) 2015-01-10 23:22:11,027 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:56724 which had sessionid 0x14ab82c142b0658 2015-01-10 23:22:11,431 [myid:] - ERROR [SyncThread:0:NIOServerCnxn@178] - Unexpected Exception: java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:55) at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:59) at org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:151) at org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1081) at org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:170) at org.apache.zookeeper.server.SyncRequestProcessor.flush(SyncRequestProcessor.java:200) at
[jira] [Updated] (KAFKA-1836) metadata.fetch.timeout.ms set to zero blocks forever
[ https://issues.apache.org/jira/browse/KAFKA-1836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1836: - Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patches! Pushed to trunk metadata.fetch.timeout.ms set to zero blocks forever Key: KAFKA-1836 URL: https://issues.apache.org/jira/browse/KAFKA-1836 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2 Reporter: Paul Pearcy Priority: Minor Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1836-new-patch.patch, KAFKA-1836.patch You can easily work around this by setting the timeout value to 1ms, but 0ms should mean 0ms or at least have the behavior documented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-1476: Attachment: KAFKA-1476_2015-01-12_16:22:26.patch Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: Balaji Seshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch, KAFKA-1476_2015-01-12_16:22:26.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29831: Patch for KAFKA-1476
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/ --- (Updated Jan. 13, 2015, 12:22 a.m.) Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- Merged in work for KAFKA-1476 and sub-task KAFKA-1826 Diffs (updated) - bin/kafka-consumer-groups.sh PRE-CREATION core/src/main/scala/kafka/admin/AdminUtils.scala 28b12c7b89a56c113b665fbde1b95f873f8624a3 core/src/main/scala/kafka/tools/ConsumerGroupCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala c14bd455b6642f5e6eb254670bef9f57ae41d6cb core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala ac15d34425795d5be20c51b01fa1108bdcd66583 Diff: https://reviews.apache.org/r/29831/diff/ Testing --- Thanks, Onur Karaman
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274473#comment-14274473 ] Onur Karaman commented on KAFKA-1476: - Updated reviewboard https://reviews.apache.org/r/29831/diff/ against branch origin/trunk Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: Balaji Seshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch, KAFKA-1476_2015-01-12_16:22:26.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274455#comment-14274455 ] Neha Narkhede commented on KAFKA-1476: -- [~balaji.sesha...@dish.com], [~onurkaraman] Thanks for incorporating review suggestions. I also think that [~onurkaraman]'s approach is good since it is easier to review a tool rewrite if it has all the functionality. That way we can ensure that we end up with something that is immediately usable and complete. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: Balaji Seshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29692: Patch for kafka-1841
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29692/#review67775 --- Ship it! Reversed an earlier comment, but up to you to pick which one you prefer. core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/29692/#comment111848 Hmm.. actually looking over this I think what you had earlier looks better overall. i.e., maybe changing the field to a var is better? Sorry about that! - Joel Koshy On Jan. 12, 2015, 10:30 p.m., Jun Rao wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29692/ --- (Updated Jan. 12, 2015, 10:30 p.m.) Review request for kafka. Bugs: kafka-1841 https://issues.apache.org/jira/browse/kafka-1841 Repository: kafka Description --- addressing Joel's comments Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/OffsetCommitResponse.scala 624a1c1cc540688ae2b1fb96665696a6084158e5 core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/server/KafkaApis.scala 9a61fcba3b5eeb295676b3ef720c719ef5244642 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 Diff: https://reviews.apache.org/r/29692/diff/ Testing --- Thanks, Jun Rao
Re: Review Request 29210: Patch for KAFKA-1819
On Jan. 12, 2015, 10:53 p.m., Joel Koshy wrote: core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala, line 68 https://reviews.apache.org/r/29210/diff/5/?file=816890#file816890line68 Actually, now that you have the check above in DeleteTopicTest, do you think it is necessary to have this? Gwen Shapira wrote: Both checks exercise the same code path, so I don't think we need both. Less code to maintain is always better, IMO. Since Neha asked to add the test in DeleteTopicTest, I want to make sure that she's ok with removing it from LogCleanerIntegrationTest. Maybe I'm missing a good reason to have both. I'm ok with removing the changes to LogCleanerIntegrationTest - Neha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/#review67717 --- On Jan. 13, 2015, 1:01 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/ --- (Updated Jan. 13, 2015, 1:01 a.m.) Review request for kafka. Bugs: KAFKA-1819 https://issues.apache.org/jira/browse/KAFKA-1819 Repository: kafka Description --- added locking improved tests per Joel and Neha's suggestions added cleaner test to DeleteTopicTest Fixes to DeleteTopicTest: clean up servers after cleaner test and move cleaner verification to the validation function minor fixes suggested by Joel Diffs - core/src/main/scala/kafka/log/LogCleaner.scala f8fcb843c80eec3cf3c931df6bb472c019305253 core/src/main/scala/kafka/log/LogCleanerManager.scala bcfef77ed53f94017c06a884e4db14531774a0a2 core/src/main/scala/kafka/log/LogManager.scala 4ebaae00ca4b80bf15c7930bae2011d98bbec053 core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 Diff: https://reviews.apache.org/r/29210/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274451#comment-14274451 ] Onur Karaman commented on KAFKA-1476: - I have some stuff for sub-task KAFKA-1826 that I'd like to check in. Since the patch for KAFKA-1476 looked almost ready, I went ahead and tried incorporating some of the suggestions from the reviewboard and merged in the command to delete all consumer group information for a given topic in zookeeper. I hope to have the reviewboard up shortly. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: Balaji Seshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29831: Patch for KAFKA-1476
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/ --- (Updated Jan. 13, 2015, 12:31 a.m.) Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- Merged in work for KAFKA-1476 and sub-task KAFKA-1826 Diffs (updated) - bin/kafka-consumer-groups.sh PRE-CREATION core/src/main/scala/kafka/admin/AdminUtils.scala 28b12c7b89a56c113b665fbde1b95f873f8624a3 core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala c14bd455b6642f5e6eb254670bef9f57ae41d6cb core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala ac15d34425795d5be20c51b01fa1108bdcd66583 Diff: https://reviews.apache.org/r/29831/diff/ Testing --- Thanks, Onur Karaman
Re: [DISCUSS] Compatability and KIPs
Having an index for every protocol/API change (like https://www.python.org/dev/peps/pep-0257/ ) will be much better than the flat wire protocol doc we have now. It is impossible ( without jumping into code ) right now to know if an error is supported in one version of Kafka vs another or different messages even. Having something that is iterative for each change that is explicit, clear and concise for developers for client development would be wonderful. Some folks just try to keep pace with the wire protocol doc regardless and often develop the wrong functionality expected because functionality is not always part of the protocol but an expectation / extension of the producer and/or consumer layer from the project code. The expected behavior I think is a huge gap between the project and client implementations. When you are a Kafka user you have certain expectations when working with producers and consumers. e.g. if you fail a produced message the expectation is to retry X times with a Y backoff between each try. The wire protocol doc doesn't always expose these features that are expected behaviors and often get missed. Assumptions get made and in clients developed very large features take a while (often seen via production issues) to get found out. I think this problem (which is a big one IMHO) also will be better resolved with the KIP process. Client application developers can look at new features, understand the goals and expectations, develop those goals in the language/system required and support the byte structure(s) for a complete use case. I think child pages from https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol might be a way to go. I only suggest that because people already use that page now and we can keep it as a high level here is what you do and then sub link to the child pages when appropriate. I hate completely abandoning something that is not entirely bad but just missing some updates in different ways. So, maybe something like that or having a committed specific part under git or svn might also make sense also. I am not really opinionated on how we implement as long as we do implement something for these issues. Feature and/or byte changes should bump the version number, +1 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Jan 12, 2015 at 8:27 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I think this makes sense. Some of the crazy nesting will get better when we move to the new protocol definition I think, but we will always need some kind of if statement that branches for the different behavior and this makes testing difficult. Probably the best thing to do would be to announce a version deprecated which will have no function but will serve as a warning that it is going away and then remove it some time later. This would mean including something that notes this in the protocol docs and maybe the release notes. We should probably just always do this for all but the latest version of all apis. I think probably a year of deprecation should be sufficient prior to removal? I also think we can maybe use some common sense in deciding this. Removing older versions will always be bad for users and client developers and always be good for Kafka committers. I think we can be more aggressive on things that are not heavily used (and hence less bad for users) or for which supporting multiple versions is particularly onerous. -Jay On Mon, Jan 12, 2015 at 5:02 PM, Guozhang Wang wangg...@gmail.com wrote: +1 on version evolving with any protocol / data format / functionality changes, and I am wondering if we have a standard process of deprecating old versions? Today with just a couple of versions for the protocol (e.g. offset commit) the code on the server side is already pretty nested and complicated in order to support different version supports. On Mon, Jan 12, 2015 at 9:21 AM, Jay Kreps j...@confluent.io wrote: Hey Jun, Good points. I totally agree that the versioning needs to cover both format and behavior if the behavior change is incompatible. I kind of agree about the stable/unstable stuff. What I think this means is not that we would ever evolve the protocol without changing the version, but rather that we would drop support for older versions quicker. On one hand that makes sense and it is often a high bar to get things right the first time. On the other hand I think in practice the set of people who interact with the protocol is often different from the end user. So the end-user experience may still be hey my code just broke because some client they use relied on an unstable protocol unbeknownst to them. But I think all that means is that we should
[jira] [Updated] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1070: -- Attachment: KAFKA-1070_2015-01-12_18:30:17.patch Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Fix For: 0.8.3 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch, KAFKA-1070_2015-01-01_17:39:30.patch, KAFKA-1070_2015-01-12_10:46:54.patch, KAFKA-1070_2015-01-12_18:30:17.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 23702: Patch for KAFKA-1070
On Jan. 12, 2015, 10:08 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala, line 55 https://reviews.apache.org/r/23702/diff/10/?file=816767#file816767line55 Minor nit which I can address during checkin: According to our coding convention, there should be a space after the , here. updated the patch with the fix. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review67715 --- On Jan. 13, 2015, 2:30 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Jan. 13, 2015, 2:30 a.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1691ad7fc80ca0b112f68e3ea0cbab265c75b26b core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala c9e8ba257b77f46c5c9b62b451470348b6e58889 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274614#comment-14274614 ] Sriharsha Chintalapani commented on KAFKA-1070: --- Updated reviewboard https://reviews.apache.org/r/23702/diff/ against branch origin/trunk Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Fix For: 0.8.3 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch, KAFKA-1070_2015-01-01_17:39:30.patch, KAFKA-1070_2015-01-12_10:46:54.patch, KAFKA-1070_2015-01-12_18:30:17.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 23702: Patch for KAFKA-1070
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Jan. 13, 2015, 2:30 a.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs (updated) - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1691ad7fc80ca0b112f68e3ea0cbab265c75b26b core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala c9e8ba257b77f46c5c9b62b451470348b6e58889 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1850) Failed reassignment leads to additional replica
[ https://issues.apache.org/jira/browse/KAFKA-1850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274657#comment-14274657 ] Joe Stein commented on KAFKA-1850: -- The reassignment isn't going to be able to finish until the new replica(s) is/are caught up. Are all of your brokers up? How much data is in your partitions? ERROR: Assigned replicas (2,1,0) don't match the list of replicas for reassignment (2,1) for partition [testingTopic,9] This means that replica #1 has not replicated everything and caught up to #2 yet (the leader). It is possible that the reassignment is still running but the replicas are just not catching up with the leader (so it is not finishing ever). This could be due to data size and volume and threads (just can't keep up) with the broker configuration. This could be due to a different message max size on broker #0 and #2 than #1 so you have a message that can't be fetched so it won't catch up. Can you confirm, is there data in the partitions on the new broker? Do you see new data coming (you can look on disk at the directories)? It could be wedged/stuck and just not finishing. One option is to restart the leader for each partition failing. I have seen that solve this issue before but I don't know if the problem you are having is in fact a bug or just the brokers simply not catching up. It could be the controller also, so restarting broker#2 may end up being what you might have to-do to fix this. I would investigate first to confirm that the issue is simply a problem of the new broker just not able to catch up and trying to resolve that before restarting brokers that are the leader and live as restarting them could have a negative impact to your cluster. Failed reassignment leads to additional replica --- Key: KAFKA-1850 URL: https://issues.apache.org/jira/browse/KAFKA-1850 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1 Environment: CentOS (Linux Kernel 2.6.32-71.el6.x86_64 ) Reporter: Alex Tian Assignee: Neha Narkhede Priority: Minor Labels: newbie Attachments: Track on testingTopic-9's movement.txt, track_on_testingTopic-9_movement_on_the_following_2_days.txt Original Estimate: 504h Remaining Estimate: 504h When I start a topic reassignment (Totally 36 partitions) in my Kafka Cluster, 24 partitions succeeded and 12 ones failed. However, the 12 failed partitions have more replicas. I think the reason is that AR still consists of RAR and OAR although the reassignment for the partition failed. Could we regard this problem as a bug? Quite sorry if any mistake in my question, since I am a beginner for Kafka. This is the output from operation: 1. alex-topics-to-move.json: {topics: [{topic: testingTopic}], version:1 } 2. Generate a reassignment plan $./kafka-reassign-partitions.sh --generate --broker-list 0,1,2,3,4 --topics-to-move-json-file ./alex-topics-to-move.json --zookeeper 192.168.112.95:2181,192.168.112.96:2181,192.168.112.97:2181,192.168.112.98:2181,192.168.112.99:2181 Current partition replica assignment {version:1, partitions:[ {topic:testingTopic,partition:27,replicas:[0,2]}, {topic:testingTopic,partition:1,replicas:[1,2]}, {topic:testingTopic,partition:12,replicas:[0,1]}, {topic:testingTopic,partition:6,replicas:[0,1]}, {topic:testingTopic,partition:16,replicas:[1,0]}, {topic:testingTopic,partition:32,replicas:[2,0]}, {topic:testingTopic,partition:18,replicas:[0,1]}, {topic:testingTopic,partition:31,replicas:[1,2]}, {topic:testingTopic,partition:9,replicas:[0,2]}, {topic:testingTopic,partition:23,replicas:[2,1]}, {topic:testingTopic,partition:19,replicas:[1,2]}, {topic:testingTopic,partition:34,replicas:[1,0]}, {topic:testingTopic,partition:17,replicas:[2,1]}, {topic:testingTopic,partition:7,replicas:[1,2]}, {topic:testingTopic,partition:20,replicas:[2,0]}, {topic:testingTopic,partition:8,replicas:[2,0]}, {topic:testingTopic,partition:11,replicas:[2,1]}, {topic:testingTopic,partition:3,replicas:[0,2]}, {topic:testingTopic,partition:30,replicas:[0,1]}, {topic:testingTopic,partition:35,replicas:[2,1]}, {topic:testingTopic,partition:26,replicas:[2,0]}, {topic:testingTopic,partition:22,replicas:[1,0]}, {topic:testingTopic,partition:10,replicas:[1,0]},
Jenkins build is back to normal : Kafka-trunk #365
See https://builds.apache.org/job/Kafka-trunk/365/changes
[jira] [Updated] (KAFKA-1841) OffsetCommitRequest API - timestamp field is not versioned
[ https://issues.apache.org/jira/browse/KAFKA-1841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1841: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the review. Committed to 0.8.2. Since we are evolving the protocol of OffsetCommitRequest in KAFKA-1634, will let KAFKA-1634 merge in the changes in this jira to trunk. OffsetCommitRequest API - timestamp field is not versioned -- Key: KAFKA-1841 URL: https://issues.apache.org/jira/browse/KAFKA-1841 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Environment: wire-protocol Reporter: Dana Powers Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: kafka-1841.patch, kafka-1841_2015-01-08_15:07:57.patch, kafka-1841_2015-01-09_14:36:50.patch, kafka-1841_2015-01-12_14:30:24.patch Timestamp field was added to the OffsetCommitRequest wire protocol api for 0.8.2 by KAFKA-1012 . The 0.8.1.1 server does not support the timestamp field, so I think the api version of OffsetCommitRequest should be incremented and checked by the 0.8.2 kafka server before attempting to read a timestamp from the network buffer in OffsetCommitRequest.readFrom (core/src/main/scala/kafka/api/OffsetCommitRequest.scala) It looks like a subsequent patch (KAFKA-1462) added another api change to support a new constructor w/ params generationId and consumerId, calling that version 1, and a pending patch (KAFKA-1634) adds retentionMs as another field, while possibly removing timestamp altogether, calling this version 2. So the fix here is not straightforward enough for me to submit a patch. This could possibly be merged into KAFKA-1634, but opening as a separate Issue because I believe the lack of versioning in the current trunk should block 0.8.2 release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1819: - Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patches, [~gwenshap]. Appreciate your help getting this into the 0.8.2 release. Pushed to trunk and 0.8.2 Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Gwen Shapira Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1819.patch, KAFKA-1819_2014-12-26_13:58:44.patch, KAFKA-1819_2014-12-30_16:01:19.patch, KAFKA-1819_2015-01-12_10:34:07.patch, KAFKA-1819_2015-01-12_11:17:53.patch, KAFKA-1819_2015-01-12_17:01:53.patch I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1760) Implement new consumer client
[ https://issues.apache.org/jira/browse/KAFKA-1760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1760: - Fix Version/s: 0.8.3 Implement new consumer client - Key: KAFKA-1760 URL: https://issues.apache.org/jira/browse/KAFKA-1760 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Jay Kreps Assignee: Jay Kreps Fix For: 0.8.3 Attachments: KAFKA-1760.patch, KAFKA-1760_2015-01-11_16:57:15.patch Implement a consumer client. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29756: Patch for KAKFA-1854
On Jan. 9, 2015, 6:48 p.m., Neha Narkhede wrote: kafka-patch-review.py, line 20 https://reviews.apache.org/r/29756/diff/4/?file=814310#file814310line20 I got the following error on this patch nnarkhed-mn1:kafka nnarkhed$ python kafka-patch-review.py -b trunk -j KAFKA-1854 -d test Configuring reviewboard url to https://reviews.apache.org Updating your remote branches to pull the latest changes Verifying JIRA connection configurations JIRA user :nehanarkhede JIRA password : Failed to login to the JIRA instance type 'exceptions.AttributeError' 'JIRA' object has no attribute 'current_user' Maybe a different version of the jira package we use renamed the user field ? Jaikiran Pai wrote: Neha, which version of jira-python are you using? I'll find and check that project's documentation to see which version supports what. Neha Narkhede wrote: I'm not entirely sure. What's the best way to find out? Neha Narkhede wrote: In any case, I think the tool should be robust enough to deal with this issue. Hi Neha, For knowing the jira-python (or any other python library installation), one way is to run the following command: `pip show jira-python` The jira-python documentation is hard to find for each specific version (they just have the latest online). So I had to look at their source for each/most of the tagged versions to see which API was supported https://bitbucket.org/bspeakmon/jira-python/src. I have made a change to this patch which works and I think it should work even for lower and higher versions of that library (based on what I see in those tagged versions). Could you please give this a try again and see how it goes for you? Thank you. - Jaikiran --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29756/#review67477 --- On Jan. 13, 2015, 5:43 a.m., Jaikiran Pai wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29756/ --- (Updated Jan. 13, 2015, 5:43 a.m.) Review request for kafka. Bugs: KAFKA-1854 https://issues.apache.org/jira/browse/KAFKA-1854 Repository: kafka Description --- KAFKA-1854 Allow JIRA username and password to be prompted in the absence of a jira.ini file, during patch submission Diffs - kafka-patch-review.py b7f132f9d210b8648859ab8f9c89f30ec128ab38 Diff: https://reviews.apache.org/r/29756/diff/ Testing --- Thanks, Jaikiran Pai
[jira] [Updated] (KAFKA-1723) make the metrics name in new producer more standard
[ https://issues.apache.org/jira/browse/KAFKA-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1723: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch. +1. Committed to 0.8.2 and trunk after the following minor changes. 1. Added the following new helper constructor in MetricName and a new unit test. public MetricName(String name, String group, String description, String... keyValue) 2. Fixed the return value of MockProducer.metrics(). make the metrics name in new producer more standard --- Key: KAFKA-1723 URL: https://issues.apache.org/jira/browse/KAFKA-1723 Project: Kafka Issue Type: Sub-task Components: clients Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Manikumar Reddy Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1723.patch, KAFKA-1723_2015-01-08_21:41:13.patch, KAFKA-1723_2015-01-08_22:02:22.patch, KAFKA-1723_2015-01-09_14:24:18.patch, KAFKA-1723_2015-01-09_23:43:22.patch The jmx name in the new producer looks like the following: kafka.producer.myclientid:type=mytopic However, this can be ambiguous since we allow . in client id and topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1854) Allow the JIRA username and password to be prompted during patch submission
[ https://issues.apache.org/jira/browse/KAFKA-1854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274783#comment-14274783 ] Guozhang Wang commented on KAFKA-1854: -- Thanks for the patch, I have added you to the contributor list, please feel free to assign to yourself now. Allow the JIRA username and password to be prompted during patch submission --- Key: KAFKA-1854 URL: https://issues.apache.org/jira/browse/KAFKA-1854 Project: Kafka Issue Type: Improvement Reporter: jaikiran pai Attachments: KAFKA-1854.patch, KAFKA-1854_2015-01-09_13:39:23.patch, KAFKA-1854_2015-01-09_15:42:28.patch, KAFKA-1854_2015-01-09_18:16:35.patch The current patch submission process involves using the kafka-patch-review.py python script which expects a jira.ini file to contain the user's username and password for JIRA authentication. I'm one of those who doesn't like storing passwords in files :) It would be good to (optionally) allow the username/password to be prompted by the patch submission script. I've a patch which I can submit for this enhancement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: unable to sign up for kafka email distribution
Looks good to me, maybe you can check if your email account is bouncing emails from this mail list or not. On Thu, Jan 8, 2015 at 11:05 AM, Padgett, Ben bpadg...@illumina.com wrote: I haven’t gotten any emails from the email list however. When I joined the Apache Cassandra list I got very frequent emails. Was trying to subscribe at this email dev-subscr...@kafka.apache.org +dev-subscr...@kafka.apache.org. A Thanks! From: Guozhang Wang wangg...@gmail.com Date: Thursday, January 8, 2015 at 10:40 AM To: dev@kafka.apache.org dev@kafka.apache.org, . . bpadg...@illumina.com Subject: Re: unable to sign up for kafka email distribution Hi Ben, Your email is received on the mailing list, the kicking-back may be a false alarm. Guozhang On Thu, Jan 8, 2015 at 9:22 AM, Padgett, Ben bpadg...@illumina.com wrote: Unable to confirm myself on kafka email list. Mail is getting kicked back from apache email servers according to our tech department. Thanks! -- -- Guozhang -- -- Guozhang
Re: Review Request 29756: Patch for KAKFA-1854
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29756/ --- (Updated Jan. 13, 2015, 5:42 a.m.) Review request for kafka. Summary (updated) - Patch for KAKFA-1854 Bugs: KAFKA-1854 and KAKFA-1854 https://issues.apache.org/jira/browse/KAFKA-1854 https://issues.apache.org/jira/browse/KAKFA-1854 Repository: kafka Description --- KAFKA-1854 Allow JIRA username and password to be prompted in the absence of a jira.ini file, during patch submission Diffs (updated) - kafka-patch-review.py b7f132f9d210b8648859ab8f9c89f30ec128ab38 Diff: https://reviews.apache.org/r/29756/diff/ Testing --- Thanks, Jaikiran Pai
Re: Review Request 29756: Patch for KAKFA-1854
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29756/ --- (Updated Jan. 13, 2015, 5:43 a.m.) Review request for kafka. Changes --- Fixed typo in JIRA id Bugs: KAFKA-1854 https://issues.apache.org/jira/browse/KAFKA-1854 Repository: kafka Description --- KAFKA-1854 Allow JIRA username and password to be prompted in the absence of a jira.ini file, during patch submission Diffs - kafka-patch-review.py b7f132f9d210b8648859ab8f9c89f30ec128ab38 Diff: https://reviews.apache.org/r/29756/diff/ Testing --- Thanks, Jaikiran Pai
Re: [DISCUSS] 0.8.2-beta2 release
So are we saying a) RC in regards to an official Apache VOTE and keeping it open for week(s) and continuously patching as issues come up calling more RC VOTEs until we ship? b) instead of beta name the release RC1 so in Maven Central it would be 0.8.2-RC1 and we could have RCX votes for that? If we went with a) folks could use the artifacts staged in apache release repository so technically possible to-do. I am in favor with which ever folks would be most likely to start to push and/or finally push into production prior to final. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Jan 12, 2015 at 12:26 PM, Neha Narkhede n...@confluent.io wrote: Joe, Thanks for starting the discussion. It may be ok to do an RC and probably two weeks later, cut the final. After releasing the RC, we can help spread the word and have people try out the new API, which is probably our main concern. -Neha On Mon, Jan 12, 2015 at 8:42 AM, Jun Rao j...@confluent.io wrote: Hi, Joe, Yes, we can do that. Also, an alternative to doing an 0.8.2 beta2 is to just do the 0.8.2 final, but leave a bit more time for people to try the RC. Thanks, Jun On Thu, Jan 8, 2015 at 8:46 PM, Joe Stein joe.st...@stealth.ly wrote: Hi, I was thinking that once all the blockers are committed for 0.8.2 that we should release beta2. Thoughts? /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / -- Thanks, Neha
Re: Review Request 29756: Patch for KAFKA-1854
On Jan. 9, 2015, 6:48 p.m., Neha Narkhede wrote: kafka-patch-review.py, line 20 https://reviews.apache.org/r/29756/diff/4/?file=814310#file814310line20 I got the following error on this patch nnarkhed-mn1:kafka nnarkhed$ python kafka-patch-review.py -b trunk -j KAFKA-1854 -d test Configuring reviewboard url to https://reviews.apache.org Updating your remote branches to pull the latest changes Verifying JIRA connection configurations JIRA user :nehanarkhede JIRA password : Failed to login to the JIRA instance type 'exceptions.AttributeError' 'JIRA' object has no attribute 'current_user' Maybe a different version of the jira package we use renamed the user field ? Jaikiran Pai wrote: Neha, which version of jira-python are you using? I'll find and check that project's documentation to see which version supports what. I'm not entirely sure. What's the best way to find out? - Neha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29756/#review67477 --- On Jan. 9, 2015, 12:47 p.m., Jaikiran Pai wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29756/ --- (Updated Jan. 9, 2015, 12:47 p.m.) Review request for kafka. Bugs: KAFKA-1854 https://issues.apache.org/jira/browse/KAFKA-1854 Repository: kafka Description --- KAFKA-1854 Allow JIRA username and password to be prompted in the absence of a jira.ini file, during patch submission Diffs - kafka-patch-review.py b7f132f9d210b8648859ab8f9c89f30ec128ab38 Diff: https://reviews.apache.org/r/29756/diff/ Testing --- Thanks, Jaikiran Pai
Re: Review Request 29756: Patch for KAFKA-1854
On Jan. 9, 2015, 6:48 p.m., Neha Narkhede wrote: kafka-patch-review.py, line 20 https://reviews.apache.org/r/29756/diff/4/?file=814310#file814310line20 I got the following error on this patch nnarkhed-mn1:kafka nnarkhed$ python kafka-patch-review.py -b trunk -j KAFKA-1854 -d test Configuring reviewboard url to https://reviews.apache.org Updating your remote branches to pull the latest changes Verifying JIRA connection configurations JIRA user :nehanarkhede JIRA password : Failed to login to the JIRA instance type 'exceptions.AttributeError' 'JIRA' object has no attribute 'current_user' Maybe a different version of the jira package we use renamed the user field ? Jaikiran Pai wrote: Neha, which version of jira-python are you using? I'll find and check that project's documentation to see which version supports what. Neha Narkhede wrote: I'm not entirely sure. What's the best way to find out? In any case, I think the tool should be robust enough to deal with this issue. - Neha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29756/#review67477 --- On Jan. 9, 2015, 12:47 p.m., Jaikiran Pai wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29756/ --- (Updated Jan. 9, 2015, 12:47 p.m.) Review request for kafka. Bugs: KAFKA-1854 https://issues.apache.org/jira/browse/KAFKA-1854 Repository: kafka Description --- KAFKA-1854 Allow JIRA username and password to be prompted in the absence of a jira.ini file, during patch submission Diffs - kafka-patch-review.py b7f132f9d210b8648859ab8f9c89f30ec128ab38 Diff: https://reviews.apache.org/r/29756/diff/ Testing --- Thanks, Jaikiran Pai
Re: Review Request 29831: Patch for KAFKA-1476
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/#review67789 --- To make review easier, could you add the output of the command for all options for 2 consumer groups that consume 2 or more topics to the JIRA? It will make it easier to review. One thing to watch out for is the ease of scripting the output from this tool. I'd also suggest asking Clark/Tood or one of the SREs to review the output from the tool. core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala https://reviews.apache.org/r/29831/#comment111860 --delete is sufficient. Same for the name of the corresponding variable in opts. core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala https://reviews.apache.org/r/29831/#comment111891 topic command = consumer command core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala https://reviews.apache.org/r/29831/#comment111893 If topic is not specified, why not delete information for all topics subscribed by this consumer group? core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala https://reviews.apache.org/r/29831/#comment111892 Invalid topic config? core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala https://reviews.apache.org/r/29831/#comment111862 It is better to rename this to DeleteConsumerGroupTest - Neha Narkhede On Jan. 13, 2015, 12:31 a.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/ --- (Updated Jan. 13, 2015, 12:31 a.m.) Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- Merged in work for KAFKA-1476 and sub-task KAFKA-1826 Diffs - bin/kafka-consumer-groups.sh PRE-CREATION core/src/main/scala/kafka/admin/AdminUtils.scala 28b12c7b89a56c113b665fbde1b95f873f8624a3 core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala c14bd455b6642f5e6eb254670bef9f57ae41d6cb core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala ac15d34425795d5be20c51b01fa1108bdcd66583 Diff: https://reviews.apache.org/r/29831/diff/ Testing --- Thanks, Onur Karaman
[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274801#comment-14274801 ] Jun Rao commented on KAFKA-1634: Committed KAFKA-1841 to 0.8.2. It would be easier to incorporate the changes in KAFKA-1841 into this jira and commit them together to trunk. Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-1634.patch, KAFKA-1634_2014-11-06_15:35:46.patch, KAFKA-1634_2014-11-07_16:54:33.patch, KAFKA-1634_2014-11-17_17:42:44.patch, KAFKA-1634_2014-11-21_14:00:34.patch, KAFKA-1634_2014-12-01_11:44:35.patch, KAFKA-1634_2014-12-01_18:03:12.patch From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1854) Allow the JIRA username and password to be prompted during patch submission
[ https://issues.apache.org/jira/browse/KAFKA-1854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jaikiran pai reassigned KAFKA-1854: --- Assignee: jaikiran pai Allow the JIRA username and password to be prompted during patch submission --- Key: KAFKA-1854 URL: https://issues.apache.org/jira/browse/KAFKA-1854 Project: Kafka Issue Type: Improvement Reporter: jaikiran pai Assignee: jaikiran pai Attachments: KAFKA-1854.patch, KAFKA-1854_2015-01-09_13:39:23.patch, KAFKA-1854_2015-01-09_15:42:28.patch, KAFKA-1854_2015-01-09_18:16:35.patch The current patch submission process involves using the kafka-patch-review.py python script which expects a jira.ini file to contain the user's username and password for JIRA authentication. I'm one of those who doesn't like storing passwords in files :) It would be good to (optionally) allow the username/password to be prompted by the patch submission script. I've a patch which I can submit for this enhancement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1854) Allow the JIRA username and password to be prompted during patch submission
[ https://issues.apache.org/jira/browse/KAFKA-1854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14274787#comment-14274787 ] jaikiran pai commented on KAFKA-1854: - Thank you [~guozhang]! Allow the JIRA username and password to be prompted during patch submission --- Key: KAFKA-1854 URL: https://issues.apache.org/jira/browse/KAFKA-1854 Project: Kafka Issue Type: Improvement Reporter: jaikiran pai Assignee: jaikiran pai Attachments: KAFKA-1854.patch, KAFKA-1854_2015-01-09_13:39:23.patch, KAFKA-1854_2015-01-09_15:42:28.patch, KAFKA-1854_2015-01-09_18:16:35.patch The current patch submission process involves using the kafka-patch-review.py python script which expects a jira.ini file to contain the user's username and password for JIRA authentication. I'm one of those who doesn't like storing passwords in files :) It would be good to (optionally) allow the username/password to be prompted by the patch submission script. I've a patch which I can submit for this enhancement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Compatability and KIPs
Jay, Thanks for bringing this up. Yes, we should increase the level of awareness of compatibility. For 1 and 2, they probably should include any functional change. For example, even if there is no change in the binary data format, but the interpretation is changed, we should consider this as a binary format change and bump up the version number. 3. Having a wider discussion on api/protocol/data changes in the mailing list seems like a good idea. 7. It might be good to also document api/protocol/data format that are considered stable (or unstable). For example, in 0.8.2 release, we will have a few new protocols (e.g. HeartBeat) for the development of the new consumer. Those new protocols probably shouldn't be considered stable until the new consumer is more fully developed. Thanks, Jun On Fri, Jan 9, 2015 at 4:29 PM, Jay Kreps j...@confluent.io wrote: Hey guys, We had a bit of a compatibility slip-up in 0.8.2 with the offset commit stuff. We caught this one before the final release so it's not too bad. But I do think it kind of points to an area we could do better. One piece of feedback we have gotten from going out and talking to users is that compatibility is really, really important to them. Kafka is getting deployed in big environments where the clients are embedded in lots of applications and any kind of incompatibility is a huge pain for people using it and generally makes upgrade difficult or impossible. In practice what I think this means for development is a lot more pressure to really think about the public interfaces we are making and try our best to get them right. This can be hard sometimes as changes come in patches and it is hard to follow every single rb with enough diligence to know. Compatibility really means a couple things: 1. Protocol changes 2. Binary data format changes 3. Changes in public apis in the clients 4. Configs 5. Metric names 6. Command line tools I think 1-2 are critical. 3 is very important. And 4, 5 and 6 are pretty important but not critical. One thing this implies is that we are really going to have to do a good job of thinking about apis and use cases. You can definitely see a number of places in the old clients and in a couple of the protocols where enough care was not given to thinking things through. Some of those were from long long ago, but we should really try to avoid adding to that set because increasingly we will have to carry around these mistakes for a long time. Here are a few things I thought we could do that might help us get better in this area: 1. Technically we are just in a really bad place with the protocol because it is defined twice--once in the old scala request objects, and once in the new protocol format for the clients. This makes changes massively painful. The good news is that the new request definition DSL was intended to make adding new protocol versions a lot easier and clearer. It will also make it a lot more obvious when the protocol is changed since you will be checking in or reviewing a change to Protocol.java. Getting the server moved over to the new request objects and protocol definition will be a bit of a slog but it will really help here I think. 2. We need to get some testing in place on cross-version compatibility. This is work and no tests here will be perfect, but I suspect with some effort we could catch a lot of things. 3. I was also thinking it might be worth it to get a little bit more formal about the review and discussion process for things which will have impact to these public areas to ensure we end up with something we are happy with. Python has a PIP process (https://www.python.org/dev/peps/pep-0257/) by which major changes are made, and it might be worth it for us to do a similar thing. We have essentially been doing this already--major changes almost always have an associated wiki, but I think just getting a little more rigorous might be good. The idea would be to just call out these wikis as official proposals and do a full Apache discuss/vote thread for these important change. We would use these for big features (security, log compaction, etc) as well as for small changes that introduce or change a public api/config/etc. This is a little heavier weight, but I think it is really just critical that we get these things right and this would be a way to call out this kind of change so that everyone would take the time to look at them. Thoughts? -Jay
[jira] [Commented] (KAFKA-725) Broker Exception: Attempt to read with a maximum offset less than start offset
[ https://issues.apache.org/jira/browse/KAFKA-725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273861#comment-14273861 ] Diwakar commented on KAFKA-725: --- Neha, we have 6 brokers and 131 partitions per topic(replication factor : 3 ) and recently updated to kafka_2.10-0.8.2.0 and facing similar issue causing lot of below errors.Due to this it seems like producers are unable to produce to kafka successfully. [2015-01-11 05:21:56.604-0700] ERROR [Replica Manager on Broker 2]: Error when processing fetch request for partition [application-access,13] offset 42748276 from consumer with correlation id 4974. Possible cause: Attempt to read with a maximum offset (42748275) less than the start offset (42748276). Any solution available to fix this. Thanks Diwakar Broker Exception: Attempt to read with a maximum offset less than start offset -- Key: KAFKA-725 URL: https://issues.apache.org/jira/browse/KAFKA-725 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.0 Reporter: Chris Riccomini Assignee: Jay Kreps I have a simple consumer that's reading from a single topic/partition pair. Running it seems to trigger these messages on the broker periodically: 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] [] [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152) java.lang.IllegalArgumentException: Attempt to read with a maximum offset (7951715) less than the start offset (7951732). at kafka.log.LogSegment.read(LogSegment.scala:105) at kafka.log.Log.read(Log.scala:390) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326) at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165) at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164) at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186) at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185) at kafka.server.KafkaApis.handle(KafkaApis.scala:58) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) When I shut the consumer down, I don't see the exceptions anymore. This is the code that my consumer is running: while(true) { // we believe the consumer to be connected, so try and use it for a fetch request val request = new FetchRequestBuilder() .addFetch(topic, partition, nextOffset, fetchSize) .maxWait(Int.MaxValue) // TODO for super high-throughput, might be worth waiting for more bytes .minBytes(1) .build debug(Fetching messages for stream %s and offset %s. format (streamPartition, nextOffset)) val messages = connectedConsumer.fetch(request) debug(Fetch complete for stream %s and offset %s. Got messages: %s format (streamPartition, nextOffset, messages)) if (messages.hasError) { warn(Got error code from broker for %s: %s. Shutting down consumer to trigger a reconnect. format (streamPartition, messages.errorCode(topic, partition))) ErrorMapping.maybeThrowException(messages.errorCode(topic, partition)) } messages.messageSet(topic, partition).foreach(msg = { watchers.foreach(_.onMessagesReady(msg.offset.toString, msg.message.payload))
Re: Review Request 29301: Patch for KAFKA-1694
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29301/ --- (Updated Jan. 12, 2015, 4:55 p.m.) Review request for kafka. Bugs: KAFKA-1694 https://issues.apache.org/jira/browse/KAFKA-1694 Repository: kafka Description (updated) --- KAFKA-1694 - introduced new type for Wire protocol, ported ClusterMetadataResponse to it KAFKA-1694 - Split Admin RQ/RP to separate messages KAFKA-1694 - Admin commands can be handled only by controller; DeleteTopicCommand NPE fix KAFKA-1776 - Ported ConsumerGroupOffsetChecker KAFKA-1776 - Ported PreferredReplicaElectionTool and ReassignPartitionsTool to CLI KAFKA-1694 - kafka-tools is uploaded on uploadAllArchives KAFKA-1694 - ReviewBoard 29301 code review fixes Diffs (updated) - bin/kafka.sh PRE-CREATION bin/windows/kafka.bat PRE-CREATION build.gradle ba52288031e2abc70e35e9297a4423dd5025950b clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 109fc965e09b2ed186a073351bd037ac8af20a4c clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/MaybeOf.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicOutput.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsOutput.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/TopicConfigDetails.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/TopicPartitionsDetails.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsResponse.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/ApiUtils.scala 1f80de1638978901500df808ca5133308c9d1fca core/src/main/scala/kafka/api/ClusterMetadataRequest.scala PRE-CREATION core/src/main/scala/kafka/api/ClusterMetadataResponse.scala PRE-CREATION core/src/main/scala/kafka/api/RequestKeys.scala c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 core/src/main/scala/kafka/api/admin/AlterTopicRequest.scala PRE-CREATION core/src/main/scala/kafka/api/admin/AlterTopicResponse.scala PRE-CREATION core/src/main/scala/kafka/api/admin/ConsumerGroupOffsetsRequest.scala
Re: Review Request 29301: Patch for KAFKA-1694
On Jan. 6, 2015, 8:19 a.m., Joe Stein wrote: build.gradle, line 211 https://reviews.apache.org/r/29301/diff/2/?file=800278#file800278line211 If we can do this without an upgrade that would be great if we are in fact just requiring 1 function. This feature allows us to call CLI commands both from interactive shell (kafka) and right from ./kafka.sh (e.g. ./kafka.sh--create-topic). The difference is that for ./kafka.sh we always have to supply one additional option --controller which is not recognized during command execution from interactive Shell. As a workaround I can process --controller separately and then simply cut off it from command line args array but this looks a bit lame. On Jan. 6, 2015, 8:19 a.m., Joe Stein wrote: clients/src/main/java/org/apache/kafka/common/protocol/types/MaybeOf.java, line 21 https://reviews.apache.org/r/29301/diff/2/?file=800281#file800281line21 not sure about this name, you are making an Option[] for the protocol value and that not make sense if you look at the use of it and not how it works. As discussed for now better naming option wasn't found:). I'm happy to change if someone comes up with a good one. I also added some comments to the class to make it more explanatory. On Jan. 6, 2015, 8:19 a.m., Joe Stein wrote: clients/src/main/java/org/apache/kafka/common/protocol/types/MaybeOf.java, line 51 https://reviews.apache.org/r/29301/diff/2/?file=800281#file800281line51 why wouldn't the null size == 0? There is always an additional byte showing whether value is present. - Andrii --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29301/#review66801 --- On Jan. 12, 2015, 4:55 p.m., Andrii Biletskyi wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29301/ --- (Updated Jan. 12, 2015, 4:55 p.m.) Review request for kafka. Bugs: KAFKA-1694 https://issues.apache.org/jira/browse/KAFKA-1694 Repository: kafka Description --- KAFKA-1694 - introduced new type for Wire protocol, ported ClusterMetadataResponse to it KAFKA-1694 - Split Admin RQ/RP to separate messages KAFKA-1694 - Admin commands can be handled only by controller; DeleteTopicCommand NPE fix KAFKA-1776 - Ported ConsumerGroupOffsetChecker KAFKA-1776 - Ported PreferredReplicaElectionTool and ReassignPartitionsTool to CLI KAFKA-1694 - kafka-tools is uploaded on uploadAllArchives KAFKA-1694 - ReviewBoard 29301 code review fixes Diffs - bin/kafka.sh PRE-CREATION bin/windows/kafka.bat PRE-CREATION build.gradle ba52288031e2abc70e35e9297a4423dd5025950b clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 109fc965e09b2ed186a073351bd037ac8af20a4c clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/MaybeOf.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicOutput.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsOutput.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsRequest.java
Re: New consumer client
Hey Bhavesh, This seems like a serious issue and not one anyone else has reported. I don't know what you mean by corrupt message, are you saying the CRC check fails? If so, that check is done both by the broker (prior to appending to the log) and the consumer so that implies either a bug in the broker or else disk corruption on the server. I do have an option to disable the CRC check in the consumer, though depending on the nature of the corruption that can just lead to more serious errors (depending on what is corrupted). -jay On Sun, Jan 11, 2015 at 11:00 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, One of the pain point of existing consumer code is CORRUPT_MESSAGE occasionally. Right now, it is hard to pin-point the problem of CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is there any proposal to auto skip corrupted message and have reporting visibility of CRC error(metics etc or traceability to find corruption).per topic etc ? I am not sure if this is correct email thread to address this if not please let me know. Will provide feedback about new consumer api and changes. Thanks, Bhavesh On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps j...@confluent.io wrote: I uploaded an updated version of the new consumer client ( https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost feature complete, and has pretty reasonable testing and metrics. I think it is ready for review and could be checked in once 0.8.2 is out. For those who haven't been following this is meant to be a new consumer client, like the new producer is 0.8.2, and intended to replace the existing high level and simple scala consumers. This still needs the server-side implementation of the partition assignment and group management to be fully functional. I have just stubbed this out in the server to allow the implementation and testing of the server but actual usage will require it. However the client that exists now is actually a fully functional replacement for the simple consumer that is vastly easier to use correctly as it internally does all the discovery and failover. It would be great if people could take a look at this code, and particularly at the public apis which have several small changes from the original proposal. Summary What's there: 1. Simple consumer functionality 2. Offset commit and fetch 3. Ability to change position with seek 4. Ability to commit all or just some offsets 5. Controller discovery, failure detection, heartbeat, and fail-over 6. Controller partition assignment 7. Logging 8. Metrics 9. Integration tests including tests that simulate random broker failures 10. Integration into the consumer performance test Limitations: 1. There could be some lingering bugs in the group management support, it is hard to fully test fully with just the stub support on the server, so we'll need to get the server working to do better I think. 2. I haven't implemented wild-card subscriptions yet. 3. No integration with console consumer yet Performance I did some performance comparison with the old consumer over localhost on my laptop. Usually localhost isn't good for testing but in this case it is good because it has near infinite bandwidth so it does a good job at catching inefficiencies that would be hidden with a slower network. These numbers probably aren't representative of what you would get over a real network, but help bring out the relative efficiencies. Here are the results: - Old high-level consumer: 213 MB/sec - New consumer: 225 MB/sec - Old simple consumer: 242 Mb/sec It may be hard to get this client up to the same point as the simple consumer as it is doing very little beyond allocating and wrapping byte buffers that it reads off the network. The big thing that shows up in profiling is the buffer allocation for reading data. So one speed-up would be to pool these. Some things to discuss 1. What should the behavior of consumer.position() and consumer.committed() be immediately after initialization (prior to calling poll). Currently these methods just fetch the current value from memory, but if the position isn't in memory it will try to fetch it from the server, if no position is found it will use the auto-offset reset policy to pick on. I think this is the right thing to do because you can't guarantee how many calls to poll() will be required before full initialization would be complete otherwise. But it is kind of weird. 2. Overall code structure improvement. These NIO network clients tend to be very imperative in nature. I'm not sure this is bad, but if anyone has any idea on improving the code I'd love to hear it. -Jay
Re: [DISCUSS] 0.8.2-beta2 release
Joe, Thanks for starting the discussion. It may be ok to do an RC and probably two weeks later, cut the final. After releasing the RC, we can help spread the word and have people try out the new API, which is probably our main concern. -Neha On Mon, Jan 12, 2015 at 8:42 AM, Jun Rao j...@confluent.io wrote: Hi, Joe, Yes, we can do that. Also, an alternative to doing an 0.8.2 beta2 is to just do the 0.8.2 final, but leave a bit more time for people to try the RC. Thanks, Jun On Thu, Jan 8, 2015 at 8:46 PM, Joe Stein joe.st...@stealth.ly wrote: Hi, I was thinking that once all the blockers are committed for 0.8.2 that we should release beta2. Thoughts? /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / -- Thanks, Neha
[jira] [Commented] (KAFKA-1694) kafka command line and centralized operations
[ https://issues.apache.org/jira/browse/KAFKA-1694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273773#comment-14273773 ] Andrii Biletskyi commented on KAFKA-1694: - Updated reviewboard https://reviews.apache.org/r/29301/diff/ against branch origin/trunk kafka command line and centralized operations - Key: KAFKA-1694 URL: https://issues.apache.org/jira/browse/KAFKA-1694 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Andrii Biletskyi Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1694.patch, KAFKA-1694_2014-12-24_21:21:51.patch, KAFKA-1694_2015-01-12_15:28:41.patch, KAFKA-1694_2015-01-12_18:54:48.patch, KAFKA-1772_1802_1775_1774_v2.patch https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1694) kafka command line and centralized operations
[ https://issues.apache.org/jira/browse/KAFKA-1694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrii Biletskyi updated KAFKA-1694: Attachment: KAFKA-1694_2015-01-12_18:54:48.patch kafka command line and centralized operations - Key: KAFKA-1694 URL: https://issues.apache.org/jira/browse/KAFKA-1694 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Andrii Biletskyi Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1694.patch, KAFKA-1694_2014-12-24_21:21:51.patch, KAFKA-1694_2015-01-12_15:28:41.patch, KAFKA-1694_2015-01-12_18:54:48.patch, KAFKA-1772_1802_1775_1774_v2.patch https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29210: Patch for KAFKA-1819
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29210/ --- (Updated Jan. 12, 2015, 6:34 p.m.) Review request for kafka. Bugs: KAFKA-1819 https://issues.apache.org/jira/browse/KAFKA-1819 Repository: kafka Description (updated) --- kafka-1797; (addressing Manikumar Reddy's comment) add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Manikumar Reddy and Neha Narkhede kafka-1851; OffsetFetchRequest returns extra partitions when input only contains unknown partitions; patched by Jun Rao; reviewed by Neha Narkhede first pass at log clean fix added locking improved tests per Joel and Neha's suggestions added cleaner test to DeleteTopicTest Fixes to DeleteTopicTest: clean up servers after cleaner test and move cleaner verification to the validation function Diffs (updated) - clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java a3b3700a1e0716643761d7032bd32bce839d3065 clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java 02db47f8736988343dd293fc3da03751f78a3b5c clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java d550a3137c066abb5e2984ac6245574832929ff8 core/src/main/scala/kafka/log/LogCleaner.scala f8fcb843c80eec3cf3c931df6bb472c019305253 core/src/main/scala/kafka/log/LogCleanerManager.scala bcfef77ed53f94017c06a884e4db14531774a0a2 core/src/main/scala/kafka/log/LogManager.scala 4ebaae00ca4b80bf15c7930bae2011d98bbec053 core/src/main/scala/kafka/network/SocketServer.scala e451592fe358158548117f47a80e807007dd8b98 core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 core/src/test/scala/unit/kafka/network/SocketServerTest.scala 5f4d85254c384dcc27a5a84f0836ea225d3a901a core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/29210/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Updated] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1819: Attachment: KAFKA-1819_2015-01-12_10:34:07.patch Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Gwen Shapira Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1819.patch, KAFKA-1819_2014-12-26_13:58:44.patch, KAFKA-1819_2014-12-30_16:01:19.patch, KAFKA-1819_2015-01-12_10:34:07.patch I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273906#comment-14273906 ] Gwen Shapira commented on KAFKA-1819: - Updated reviewboard https://reviews.apache.org/r/29210/diff/ against branch trunk Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Gwen Shapira Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1819.patch, KAFKA-1819_2014-12-26_13:58:44.patch, KAFKA-1819_2014-12-30_16:01:19.patch, KAFKA-1819_2015-01-12_10:34:07.patch I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1858) Make ServerShutdownTest a bit less flaky
Gwen Shapira created KAFKA-1858: --- Summary: Make ServerShutdownTest a bit less flaky Key: KAFKA-1858 URL: https://issues.apache.org/jira/browse/KAFKA-1858 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira ServerShutdownTest currently: * Starts a KafkaServer * Does stuff * Stops the server * Counts if there are any live kafka threads This is fine on its own. But when running in a test suite (i.e gradle test), the test is very very sensitive to any other test freeing all resources. If you start a server in a previous test and forgot to close it, the ServerShutdownTest will find threads from the previous test and fail. This makes for a flaky test that is pretty challenging to troubleshoot. I suggest counting the threads at the beginning and end of each test in the class, and only failing if the number at the end is greater than the number at the beginning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273918#comment-14273918 ] Neha Narkhede commented on KAFKA-1819: -- [~gwenshap] Thanks for looking into the possible cause. I just looked at the patch and looks like there are a few changes unrelated to the patch - https://reviews.apache.org/r/29210/diff/3-4/ ? I guess the only changed expected should be in the tests right? Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Gwen Shapira Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1819.patch, KAFKA-1819_2014-12-26_13:58:44.patch, KAFKA-1819_2014-12-30_16:01:19.patch, KAFKA-1819_2015-01-12_10:34:07.patch I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273921#comment-14273921 ] Gwen Shapira commented on KAFKA-1819: - Ouch. Yes. Good catch! Let me check how this got in. Probably unclean repo. Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Gwen Shapira Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1819.patch, KAFKA-1819_2014-12-26_13:58:44.patch, KAFKA-1819_2014-12-30_16:01:19.patch, KAFKA-1819_2015-01-12_10:34:07.patch I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1856) Add PreCommit Patch Testing
[ https://issues.apache.org/jira/browse/KAFKA-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273923#comment-14273923 ] Neha Narkhede commented on KAFKA-1856: -- Thanks for filing this [~singhashish]! Look forward to this. Add PreCommit Patch Testing --- Key: KAFKA-1856 URL: https://issues.apache.org/jira/browse/KAFKA-1856 Project: Kafka Issue Type: Task Reporter: Ashish Kumar Singh Assignee: Ashish Kumar Singh h1. Kafka PreCommit Patch Testing - *Don't wait for it to break* h2. Motivation *With great power comes great responsibility* - Uncle Ben. As Kafka user list is growing, mechanism to ensure quality of the product is required. Quality becomes hard to measure and maintain in an open source project, because of a wide community of contributors. Luckily, Kafka is not the first open source project and can benefit from learnings of prior projects. PreCommit tests are the tests that are run for each patch that gets attached to an open JIRA. Based on tests results, test execution framework, test bot, +1 or -1 the patch. Having PreCommit tests take the load off committers to look at or test each patch. h2. Tests in Kafka h3. Unit and Integraiton Tests [Unit and Integration tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Unit+and+Integration+Tests] are cardinal to help contributors to avoid breaking existing functionalities while adding new functionalities or fixing older ones. These tests, atleast the ones relevant to the changes, must be run by contributors before attaching a patch to a JIRA. h3. System Tests [System tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests] are much wider tests that, unlike unit tests, focus on end-to-end scenarios and not some specific method or class. h2. Apache PreCommit tests Apache provides a mechanism to automatically build a project and run a series of tests whenever a patch is uploaded to a JIRA. Based on test execution, the test framework will comment with a +1 or -1 on the JIRA. You can read more about the framework here: http://wiki.apache.org/general/PreCommitBuilds h2. Plan - Create a test-patch.py script (similar to the one used in Flume, Sqoop and other projects) that will take a jira as a parameter, apply on the appropriate branch, build the project, run tests and report results. This script should be committed into the Kafka code-base. To begin with, this will only run unit tests. We can add code sanity checks, system_tests, etc in the future. - Create a jenkins job for running the test (as described in http://wiki.apache.org/general/PreCommitBuilds) and validate that it works manually. This must be done by a committer with Jenkins access. - Ask someone with access to https://builds.apache.org/job/PreCommit-Admin/ to add Kafka to the list of projects PreCommit-Admin triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 23702: Patch for KAFKA-1070
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Jan. 12, 2015, 6:46 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs (updated) - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1691ad7fc80ca0b112f68e3ea0cbab265c75b26b core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala c9e8ba257b77f46c5c9b62b451470348b6e58889 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 23702: Patch for KAFKA-1070
On Jan. 9, 2015, 11:39 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala, line 30 https://reviews.apache.org/r/23702/diff/9/?file=805362#file805362line30 I think the way you modelled this checkpoint file, which makes it different from all other checkpoints we have, is that the metadata checkpoints handles the multiple log directories itself. So it is not a single broker metadata checkpoint. My only concern with this is that now only this checkpoint is very different from other checkpoints. I'd prefer we maintain some consistency in all our checkpoints. So modeling log directories is fine but then it's desirable if all our checkpoints behaved that way. If you'd prefer changing this checkpoint file to match others, I'd suggest- 1. Change the name to BrokerMetadataCheckpoint. 2. Make it take the File that identifies the metadata checkpoint file in the constructor 3. Change write to not accept the log directories. Just include the BrokerMetadata object. 4. Similarly include no parameters in the read() API Followed your suggesstion. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review67551 --- On Jan. 12, 2015, 6:46 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Jan. 12, 2015, 6:46 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1691ad7fc80ca0b112f68e3ea0cbab265c75b26b core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala c9e8ba257b77f46c5c9b62b451470348b6e58889 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1070: -- Attachment: KAFKA-1070_2015-01-12_10:46:54.patch Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Fix For: 0.8.3 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch, KAFKA-1070_2015-01-01_17:39:30.patch, KAFKA-1070_2015-01-12_10:46:54.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273928#comment-14273928 ] Sriharsha Chintalapani commented on KAFKA-1070: --- Updated reviewboard https://reviews.apache.org/r/23702/diff/ against branch origin/trunk Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Fix For: 0.8.3 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch, KAFKA-1070_2015-01-01_17:39:30.patch, KAFKA-1070_2015-01-12_10:46:54.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1857) Kafka Broker ids are removed ( with zookeeper , Storm )
[ https://issues.apache.org/jira/browse/KAFKA-1857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-1857. -- Resolution: Invalid Please send such questions on the mailing list. It has higher visibility and also prevents polluting JIRA with things that are not likely to be real bugs. Kafka Broker ids are removed ( with zookeeper , Storm ) --- Key: KAFKA-1857 URL: https://issues.apache.org/jira/browse/KAFKA-1857 Project: Kafka Issue Type: Bug Components: consumer, controller Affects Versions: 0.8.1 Environment: Ubuntu , With Storm-kafka and zookeeeper 3.4.6 Reporter: Yoonhyeok Kim Assignee: Neha Narkhede Hi, I am using kind of Real-time analytics system with zookeeper, Storm Kafka. -versions Storm : storm-corer-0.9.2 Kafka 0.8.1 (3 brokers) storm-kafka : 0.9.2 zookeeper 3.4.6 (standalone) But this problem occurs when I use pre-versions as well. - exceptions EndOfStreamException, java.nio.channels.CancelledKeyException, org.apache.zookeeper.KeeperException$BadVersionException --- When I use kafka spout with storm , sometimes there was zookeeper logs like (zookeeper.out) {code} 2015-01-10 19:19:00,836 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b0658, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) {code} still, zookeeper is working well, and storm-kafka looks fine , transfers data rightly. But as time goes by, those kind of Error keep occurs and then I saw different logs like... {code} 2015-01-10 23:22:11,022 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:48504 which had sessionid 0x14ab82c142b0644 2015-01-10 23:22:11,023 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b001d, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) 2015-01-10 23:22:11,023 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:55885 which had sessionid 0x14ab82c142b001d 2015-01-10 23:22:11,023 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b063e, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) 2015-01-10 23:22:11,026 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:48444 which had sessionid 0x14ab82c142b063e 2015-01-10 23:22:11,026 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x14ab82c142b0639, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:619) 2015-01-10 23:22:11,027 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /70.7.12.38:56724 which had sessionid 0x14ab82c142b0658 2015-01-10 23:22:11,431 [myid:] - ERROR [SyncThread:0:NIOServerCnxn@178] - Unexpected Exception: java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:55) at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:59) at org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:151) at org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1081) at org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:170) at