[jira] [Updated] (KAFKA-1890) Fix bug preventing Mirror Maker from successful rebalance.
[ https://issues.apache.org/jira/browse/KAFKA-1890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1890: - Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch, Becket. Pushed to trunk Fix bug preventing Mirror Maker from successful rebalance. -- Key: KAFKA-1890 URL: https://issues.apache.org/jira/browse/KAFKA-1890 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-1890.patch Follow-up patch for KAFKA-1650 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2
[ https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14289595#comment-14289595 ] Jun Rao commented on KAFKA-1729: Do you mean that you need to make code changes in the 0.8.2 branch or just the doc change? add doc for Kafka-based offset management in 0.8.2 -- Key: KAFKA-1729 URL: https://issues.apache.org/jira/browse/KAFKA-1729 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Joel Koshy Fix For: 0.8.2 Attachments: KAFKA-1782-doc-v1.patch, KAFKA-1782-doc-v2.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29831: Patch for KAFKA-1476
On Jan. 23, 2015, 4:47 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala, line 259 https://reviews.apache.org/r/29831/diff/6/?file=829769#file829769line259 = If set along with --delete I'm not sure if I fully understood the purpose of force-delete. Basically, the only time safe for deleting a consumer group's offset information is if there are no live consumers in that group anymore. If so, --force-delete would mean deleting even if that is not true. This is pretty disruptive and I can't think of any case where this action will be useful. Thoughts? There are currently four types of delete: bin/kafka-consumer-groups.sh --zookeeper 192.168.50.11:2181 --delete --group g1 --group g5 bin/kafka-consumer-groups.sh --zookeeper 192.168.50.11:2181 --delete --group g3 --group g4 --topic t2 bin/kafka-consumer-groups.sh --zookeeper 192.168.50.11:2181 --delete --topic t1 bin/kafka-consumer-groups.sh --zookeeper 192.168.50.11:2181 --delete --topic t3 --force-delete --force-delete only applies to topic-wide delete. My concern was that when you do a topic-wide delete, you can potentially impact many consumer groups. So by default, topic-wide delete first checks if the topic still exists. My reasoning was that if a topic still exists during a topic-wide offset delete, it probably wasn't intentional. It overrides the default behavior by ignoring the topic-existance check. One complication of the topic existance check is the following scenario: 1. We delete topic t. 2. t gets recreated due to some producers still producing events to t. 3. We try to do a topic-wide offset delete on t. 4. The check will prevent the offset delete from happening. --force-delete attempts to address that scenario. I agree that it is not safe to delete offsets while the group is active, and none of the 4 deletes currently check for this. While it is documented, it makes more sense to push this into the code. - Onur --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/#review69400 --- On Jan. 22, 2015, 10:32 a.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/ --- (Updated Jan. 22, 2015, 10:32 a.m.) Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- Merged in work for KAFKA-1476 and sub-task KAFKA-1826 Diffs - bin/kafka-consumer-groups.sh PRE-CREATION core/src/main/scala/kafka/admin/AdminUtils.scala 28b12c7b89a56c113b665fbde1b95f873f8624a3 core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala c14bd455b6642f5e6eb254670bef9f57ae41d6cb core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala ac15d34425795d5be20c51b01fa1108bdcd66583 Diff: https://reviews.apache.org/r/29831/diff/ Testing --- Thanks, Onur Karaman
Build failed in Jenkins: Kafka-trunk #378
See https://builds.apache.org/job/Kafka-trunk/378/changes Changes: [neha.narkhede] KAFKA-1890 Fix bug preventing Mirror Maker from successful rebalance; reviewed by Gwen Shapira and Neha Narkhede -- [...truncated 502 lines...] kafka.log.LogTest testAppendMessageWithNullPayload PASSED kafka.log.LogTest testCorruptLog PASSED kafka.log.LogTest testCleanShutdownFile PASSED kafka.log.LogTest testParseTopicPartitionName PASSED kafka.log.LogTest testParseTopicPartitionNameForEmptyName PASSED kafka.log.LogTest testParseTopicPartitionNameForNull PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingSeparator PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingTopic PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingPartition PASSED kafka.log.LogSegmentTest testReadOnEmptySegment PASSED kafka.log.LogSegmentTest testReadBeforeFirstOffset PASSED kafka.log.LogSegmentTest testMaxOffset PASSED kafka.log.LogSegmentTest testReadAfterLast PASSED kafka.log.LogSegmentTest testReadFromGap PASSED kafka.log.LogSegmentTest testTruncate PASSED kafka.log.LogSegmentTest testTruncateFull PASSED kafka.log.LogSegmentTest testNextOffsetCalculation PASSED kafka.log.LogSegmentTest testChangeFileSuffixes PASSED kafka.log.LogSegmentTest testRecoveryFixesCorruptIndex PASSED kafka.log.LogSegmentTest testRecoveryWithCorruptMessage PASSED kafka.log.LogConfigTest testFromPropsDefaults PASSED kafka.log.LogConfigTest testFromPropsEmpty PASSED kafka.log.LogConfigTest testFromPropsToProps PASSED kafka.log.LogConfigTest testFromPropsInvalid PASSED kafka.log.CleanerTest testCleanSegments PASSED kafka.log.CleanerTest testCleaningWithDeletes PASSED kafka.log.CleanerTest testCleanSegmentsWithAbort PASSED kafka.log.CleanerTest testSegmentGrouping PASSED kafka.log.CleanerTest testBuildOffsetMap PASSED kafka.log.FileMessageSetTest testWrittenEqualsRead PASSED kafka.log.FileMessageSetTest testIteratorIsConsistent PASSED kafka.log.FileMessageSetTest testSizeInBytes PASSED kafka.log.FileMessageSetTest testWriteTo PASSED kafka.log.FileMessageSetTest testTruncate PASSED kafka.log.FileMessageSetTest testFileSize PASSED kafka.log.FileMessageSetTest testIterationOverPartialAndTruncation PASSED kafka.log.FileMessageSetTest testIterationDoesntChangePosition PASSED kafka.log.FileMessageSetTest testRead PASSED kafka.log.FileMessageSetTest testSearch PASSED kafka.log.FileMessageSetTest testIteratorWithLimits PASSED kafka.log.OffsetMapTest testBasicValidation PASSED kafka.log.OffsetMapTest testClear PASSED kafka.log.OffsetIndexTest truncate PASSED kafka.log.OffsetIndexTest randomLookupTest PASSED kafka.log.OffsetIndexTest lookupExtremeCases PASSED kafka.log.OffsetIndexTest appendTooMany PASSED kafka.log.OffsetIndexTest appendOutOfOrder PASSED kafka.log.OffsetIndexTest testReopen PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[0] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[1] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[2] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[3] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[4] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[5] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[6] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[7] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[8] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[9] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[10] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[11] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[12] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[13] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[14] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[15] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[16] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[17] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[18] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[19] PASSED kafka.log.LogManagerTest testCreateLog PASSED kafka.log.LogManagerTest testGetNonExistentLog PASSED kafka.log.LogManagerTest testCleanupExpiredSegments PASSED kafka.log.LogManagerTest testCleanupSegmentsToMaintainSize PASSED kafka.log.LogManagerTest testTimeBasedFlush PASSED kafka.log.LogManagerTest testLeastLoadedAssignment PASSED kafka.log.LogManagerTest testTwoLogManagersUsingSameDirFails PASSED kafka.log.LogManagerTest testCheckpointRecoveryPoints PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithTrailingSlash PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithRelativeDirectory PASSED
[jira] [Created] (KAFKA-1894) Avoid long or infinite blocking in the consumer
Jay Kreps created KAFKA-1894: Summary: Avoid long or infinite blocking in the consumer Key: KAFKA-1894 URL: https://issues.apache.org/jira/browse/KAFKA-1894 Project: Kafka Issue Type: Sub-task Reporter: Jay Kreps The new consumer has a lot of loops that look something like {code} while(!isThingComplete()) client.poll(); {code} This occurs both in KafkaConsumer but also in NetworkClient.completeAll. These retry loops are actually mostly the behavior we want but there are several cases where they may cause problems: - In the case of a hard failure we may hang for a long time or indefinitely before realizing the connection is lost. - In the case where the cluster is malfunctioning or down we may retry forever. It would probably be better to give a timeout to these. The proposed approach would be to add something like retry.time.ms=6 and only continue retrying for that period of time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 30199: Patch for KAFKA-1890
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30199/#review69405 --- Ship it! Ship It! - Neha Narkhede On Jan. 23, 2015, 3:57 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30199/ --- (Updated Jan. 23, 2015, 3:57 a.m.) Review request for kafka. Bugs: KAFKA-1890 https://issues.apache.org/jira/browse/KAFKA-1890 Repository: kafka Description --- Patch for KAFKA-1890 Mirror maker hit NPE at startup. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala 5cbc8103e33a0a234d158c048e5314e841da6249 Diff: https://reviews.apache.org/r/30199/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Created] (KAFKA-1892) System tests for the new consumer and co-ordinator
Jay Kreps created KAFKA-1892: Summary: System tests for the new consumer and co-ordinator Key: KAFKA-1892 URL: https://issues.apache.org/jira/browse/KAFKA-1892 Project: Kafka Issue Type: Sub-task Reporter: Jay Kreps We need to get system test coverage for the new consumer implementation and the co-ordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29831: Patch for KAFKA-1476
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/#review69400 --- core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala https://reviews.apache.org/r/29831/#comment114100 As I described in my other comment, the only time safe enough to delete consumer group information is if there are no active consumers in the group. If not, then we should error out from --delete and give a WARNING to the user. The question is if --force-delete makes sense or not since that would mean deleting offsets and group information for active consumers, which is very disruptive. core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala https://reviews.apache.org/r/29831/#comment114099 How is this tool going to behave if the consumer's offset information is stored in kafka, not zookeeper? The assumption of the user would be to handle that case transparently as well. core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala https://reviews.apache.org/r/29831/#comment114098 change to --delete core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala https://reviews.apache.org/r/29831/#comment114097 = If set along with --delete I'm not sure if I fully understood the purpose of force-delete. Basically, the only time safe for deleting a consumer group's offset information is if there are no live consumers in that group anymore. If so, --force-delete would mean deleting even if that is not true. This is pretty disruptive and I can't think of any case where this action will be useful. Thoughts? - Neha Narkhede On Jan. 22, 2015, 10:32 a.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/ --- (Updated Jan. 22, 2015, 10:32 a.m.) Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- Merged in work for KAFKA-1476 and sub-task KAFKA-1826 Diffs - bin/kafka-consumer-groups.sh PRE-CREATION core/src/main/scala/kafka/admin/AdminUtils.scala 28b12c7b89a56c113b665fbde1b95f873f8624a3 core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala c14bd455b6642f5e6eb254670bef9f57ae41d6cb core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala ac15d34425795d5be20c51b01fa1108bdcd66583 Diff: https://reviews.apache.org/r/29831/diff/ Testing --- Thanks, Onur Karaman
[jira] [Resolved] (KAFKA-1331) Add ability to commit offsets to the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1331. -- Resolution: Duplicate Done as part of KAFKA-1760 Add ability to commit offsets to the new consumer - Key: KAFKA-1331 URL: https://issues.apache.org/jira/browse/KAFKA-1331 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede This will include adding functionality to the commit()/commitAsync()/committed() APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1893) Allow regex subscriptions in the new consumer
Jay Kreps created KAFKA-1893: Summary: Allow regex subscriptions in the new consumer Key: KAFKA-1893 URL: https://issues.apache.org/jira/browse/KAFKA-1893 Project: Kafka Issue Type: Sub-task Reporter: Jay Kreps The consumer needs to handle subscribing to regular expressions. Presumably this would be done as a new api, {code} void subscribe(java.util.regex.Pattern pattern); {code} Some questions/thoughts to work out: - It should not be possible to mix pattern subscription with partition subscription. - Is it allowable to mix this with normal topic subscriptions? Logically this is okay but a bit complex to implement. - We need to ensure we regularly update the metadata and recheck our regexes against the metadata to update subscriptions for new topics that are created or old topics that are deleted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1330) Implement subscribe(TopicPartition...partitions) and pool(timeout) in the consumer
[ https://issues.apache.org/jira/browse/KAFKA-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1330. -- Resolution: Duplicate Done as part of KAFKA-1760 Implement subscribe(TopicPartition...partitions) and pool(timeout) in the consumer -- Key: KAFKA-1330 URL: https://issues.apache.org/jira/browse/KAFKA-1330 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Jay Kreps Attachments: KAFKA-1330.patch, KAFKA-1330_2014-07-21_16:44:23.patch This involves adding basic fetch functionality (equivalent to SimpleConsumer) to the new consumer. Effectively: 1) Implement subscribe(TopicPartition...partitions) and unsubscribe(TopicPartition...partitions). 2) Implement pool(timeout). 3) Add memory management to the consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1329) Add metadata fetch and refresh functionality to the consumer
[ https://issues.apache.org/jira/browse/KAFKA-1329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1329: - Resolution: Fixed Status: Resolved (was: Patch Available) I incorporated this into KAFKA-1760. Add metadata fetch and refresh functionality to the consumer Key: KAFKA-1329 URL: https://issues.apache.org/jira/browse/KAFKA-1329 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Neha Narkhede Attachments: KAFKA-1329.patch Add metadata fetch and refresh functionality to the consumer. This is dependent on https://issues.apache.org/jira/browse/KAFKA-1316 as we first need to refactor the Sender to be able to use a common set of APIs to update metadata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1332) Add functionality to the offsetsBeforeTime() API
[ https://issues.apache.org/jira/browse/KAFKA-1332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14289655#comment-14289655 ] Jay Kreps commented on KAFKA-1332: -- It will be good to rethink this API, but for now I think we can just expose seekToEnd and seekToBeginning in the consumer which are useful helpers and cover 99% of what you would want. So this issue shouldn't actually block releasing the consumer. Add functionality to the offsetsBeforeTime() API Key: KAFKA-1332 URL: https://issues.apache.org/jira/browse/KAFKA-1332 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Add functionality to the offsetsBeforeTime() API to load offsets corresponding to a particular timestamp, including earliest and latest offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27799: New consumer
On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote: core/src/test/scala/integration/kafka/api/ConsumerTest.scala, line 64 https://reviews.apache.org/r/27799/diff/5/?file=830064#file830064line64 Shall we add the @Test label just in case? Jay Kreps wrote: No I don't think so. I think the point you made in the other JIRA was that @Test only works for junit 4, junit 3 requires the name test. So adding Junit4 annotations to junit 3 tests which has no effect will be super confusing, right? The other JIRA is for @expected label not being honored in junit3. My point here is that although currently test prefix works for the current version it may be most safe to add the label @Test which will be honored by all versions. On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote: core/src/test/scala/integration/kafka/api/ConsumerTest.scala, lines 158-180 https://reviews.apache.org/r/27799/diff/5/?file=830064#file830064line158 I think a more comprehensive test will be running the producer / consumer in background threads while the main thread will just iterate over killing / restarting brokers, as with this we are assured at least enough iterations will be executed before all produced records get consumed. Jay Kreps wrote: Hmm, not sure I agree. Those threaded tests are insane to read and debug. I really really think we need to focus on maintainability/debugability of tests. I actually think this catches all the cases, right? You don't actually care that the disconnect and execution happens in parallel or not. Also not sure that helps the number of iterations which is really determined by the number of messages. That is parameterized already. My past experience in ProducerFailureHandling test cases is that some bugs may be hidden when we kill brokers synchronously within the iteration, but maybe there are no such cases in consumer tests as it is single threaded and the connection will only be checked upon each poll(). On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote: core/src/test/scala/integration/kafka/api/ConsumerTest.scala, lines 268-270 https://reviews.apache.org/r/27799/diff/5/?file=830064#file830064line268 If there is not enough records then this will be blocked forever, so shall we add a timeout config and fail the test if timeout is reached? Jay Kreps wrote: Yeah but I mean we have this all over where the failure case is to hang, right? It isn't ideal but the fix tends to be pretty convoluted, and it is always clear the problem since the test that hangs is the failing test. The problem is that when we do ./gradlew test, only the finished (passed or failed) tests will print on stdout, and hence when it hanges we will only know which test class is problematic but cannot nail down to the specific test case unless we add -i. This process has been very cumbersome for at least myself in the past, and I think it is better we fail the test clearly instead of let it hang for detection. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/#review68947 --- On Jan. 23, 2015, 4:22 a.m., Jay Kreps wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/ --- (Updated Jan. 23, 2015, 4:22 a.m.) Review request for kafka. Bugs: KAFKA-1760 https://issues.apache.org/jira/browse/KAFKA-1760 Repository: kafka Description --- New consumer. Diffs - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 clients/src/main/java/org/apache/kafka/clients/ClientRequest.java d32c319d8ee4c46dad309ea54b136ea9798e2fd7 clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 8aece7e81a804b177a6f2c12e2dc6c89c1613262 clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/ConnectionState.java ab7e3220f9b76b92ef981d695299656f041ad5ed clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 397695568d3fd8e835d8f923a89b3b00c96d0ead clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java c0c636b3e1ba213033db6d23655032c9bbd5e378 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 57c1807ccba9f264186f83e91f37c34b959c8060
[jira] [Commented] (KAFKA-1655) Allow high performance SimpleConsumer use cases to still work with new Kafka 0.9 consumer APIs
[ https://issues.apache.org/jira/browse/KAFKA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14289744#comment-14289744 ] Jay Kreps commented on KAFKA-1655: -- I believe this is handled in the new consumer API. Can you take a look at the APIs and see what you think. Essentially you would do something like {code} consumer.subscribe(topic, partition) consumer.seek(topic, partition, offset) records = consumer.poll(timeout) consumer.unsubscribe(topic, partition) {code} The subscribe/unsubscribe is just an in-memory modification. Allow high performance SimpleConsumer use cases to still work with new Kafka 0.9 consumer APIs -- Key: KAFKA-1655 URL: https://issues.apache.org/jira/browse/KAFKA-1655 Project: Kafka Issue Type: New Feature Components: consumer Affects Versions: 0.9.0 Reporter: Valentin Hi guys, currently Kafka allows consumers to either chose the low level or the high level API, depending on the specific requirements of the consumer implementation. However, I was told that the current low level API (SimpleConsumer) will be deprecated once the new Kafka 0.9 consumer APIs are available. In this case it would be good, if we can ensure that the new API does offer some ways to get similar performance for use cases which perfectly fit the old SimpleConsumer API approach. Example Use Case: A high throughput HTTP API wrapper for consumer requests which gets HTTP REST calls to retrieve data for a specific set of topic partitions and offsets. Here the SimpleConsumer is perfect because it allows connection pooling in the HTTP API web application with one pool per existing kafka broker and the web application can handle the required metadata managment to know which pool to fetch a connection for, for each used topic partition. This means connections to Kafka brokers can remain open/pooled and connection/reconnection and metadata overhead is minimized. To achieve something similar with the new Kafka 0.9 consumer APIs, it would be good if it could: - provide a lowlevel call to connect to a specific broker and to read data from a topic+partition+offset OR - ensure that subscribe/unsubscribe calls are very cheap and can run without requiring any network traffic. If I subscribe to a topic partition for which the same broker is the leader as the last topic partition which was in use for this consumer API connection, then the consumer API implementation should recognize this and should not do any disconnects/reconnects and just reuse the existing connection to that kafka broker. Or put differently, it should be possible to do external metadata handling in the consumer API client and the client should be able to pool consumer API connections effectively by having one pool per Kafka broker. Greetings Valentin -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
A centralized admin operation protocol would be very useful. One more general comment here is that controller is originally designed to only talk to other brokers through ControllerChannel, while the broker instance which carries the current controller is agnostic of its existence, and use KafkaApis to handle general Kafka requests. Having all admin requests redirected to the controller instance will force the broker to be aware of its carried controller, and access its internal data for handling these requests. Plus with the number of clients out of Kafka's control, this may easily cause the controller to be a hot spot in terms of request load. On Thu, Jan 22, 2015 at 10:09 PM, Joe Stein joe.st...@stealth.ly wrote: inline On Thu, Jan 22, 2015 at 11:59 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, This is great. A few comments on KIP-4 1. This is much needed functionality, but there are a lot of the so let's really think these protocols through. We really want to end up with a set of well thought-out, orthoganol apis. For this reason I think it is really important to think through the end state even if that includes APIs we won't implement in the first phase. ok 2. Let's please please please wait until we have switched the server over to the new java protocol definitions. If we add upteen more ad hoc scala objects that is just generating more work for the conversion we know we have to do. ok :) 3. This proposal introduces a new type of optional parameter. This is inconsistent with everything else in the protocol where we use -1 or some other marker value. You could argue either way but let's stick with that for consistency. For clients that implemented the protocol in a better way than our scala code these basic primitives are hard to change. yes, less confusing, ok. 4. ClusterMetadata: This seems to duplicate TopicMetadataRequest which has brokers, topics, and partitions. I think we should rename that request ClusterMetadataRequest (or just MetadataRequest) and include the id of the controller. Or are there other things we could add here? We could add broker version to it. 5. We have a tendency to try to make a lot of requests that can only go to particular nodes. This adds a lot of burden for client implementations (it sounds easy but each discovery can fail in many parts so it ends up being a full state machine to do right). I think we should consider making admin commands and ideally as many of the other apis as possible available on all brokers and just redirect to the controller on the broker side. Perhaps there would be a general way to encapsulate this re-routing behavior. If we do that then we should also preserve what we have and do both. The client can then decide do I want to go to any broker and proxy or just go to controller and run admin task. Lots of folks have seen controllers come under distress because of their producers/consumers. There is ticket too for controller elect and re-elect https://issues.apache.org/jira/browse/KAFKA-1778 so you can force it to a broker that has 0 load. 6. We should probably normalize the key value pairs used for configs rather than embedding a new formatting. So two strings rather than one with an internal equals sign. ok 7. Is the postcondition of these APIs that the command has begun or that the command has been completed? It is a lot more usable if the command has been completed so you know that if you create a topic and then publish to it you won't get an exception about there being no such topic. We should define that more. There needs to be some more state there, yes. We should try to cover https://issues.apache.org/jira/browse/KAFKA-1125 within what we come up with. 8. Describe topic and list topics duplicate a lot of stuff in the metadata request. Is there a reason to give back topics marked for deletion? I feel like if we just make the post-condition of the delete command be that the topic is deleted that will get rid of the need for this right? And it will be much more intuitive. I will go back and look through it. 9. Should we consider batching these requests? We have generally tried to allow multiple operations to be batched. My suspicion is that without this we will get a lot of code that does something like for(topic: adminClient.listTopics()) adminClient.describeTopic(topic) this code will work great when you test on 5 topics but not do as well if you have 50k. So = Input is a list of topics (or none for all) and a batch response from the controller (which could be routed through another broker) of the entire response? We could introduce a Batch keyword to explicitly show the usage of it. 10. I think we should also discuss how we want to expose a programmatic JVM client api for these operations. Currently people rely on
[jira] [Created] (KAFKA-1895) Investigate moving deserialization and decompression out of KafkaConsumer
Jay Kreps created KAFKA-1895: Summary: Investigate moving deserialization and decompression out of KafkaConsumer Key: KAFKA-1895 URL: https://issues.apache.org/jira/browse/KAFKA-1895 Project: Kafka Issue Type: Sub-task Reporter: Jay Kreps The consumer implementation in KAFKA-1760 decompresses fetch responses and deserializes them into ConsumerRecords which are then handed back as the result of poll(). There are several downsides to this: 1. It is impossible to scale serialization and decompression work beyond the single thread running the KafkaConsumer. 2. The results can come back during the processing of other calls such as commit() etc which can result in caching these records a little longer. An alternative would be to have ConsumerRecords wrap the actual compressed serialized MemoryRecords chunks and do the deserialization during iteration. This way you could scale this over a thread pool if needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27799: New consumer
On Jan. 23, 2015, 8:57 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 247 https://reviews.apache.org/r/27799/diff/6/?file=831485#file831485line247 I think these methods need to have timeouts on them. They get called via synchronized methods in KafkaConsumer and KafkaConsumer.close() is also synchronized, so an attempt to shutdown the consumer could be blocked indefinitely by a call to completeAll(). Yeah I agree. I think this has the same issue the producer does where it is vulnerable to silent failures blocking for whatever the socket retransmit time or whatever it is hits. I think it would be worth turning this into a follow-up issue though as there are actually a number of similar problems in our infinite retry strategy. Likely the best thing to do is introduce a new config with a longish default setting and bound both these calls as well as all retry loops with that time in a comprehensive way. Filed KAFKA-1894. On Jan. 23, 2015, 8:57 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 777 https://reviews.apache.org/r/27799/diff/6/?file=831493#file831493line777 Does this give the right metric names? Looks different than the one in KafkaProducer (which doesn't have a trailing dot. Nice catch. On Jan. 23, 2015, 8:57 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1036 https://reviews.apache.org/r/27799/diff/6/?file=831493#file831493line1036 Won't this cause busy looping during network failures (and maybe due to other errors returned in the OffsetCommitResponse)? I think I fixed that in one of the updates--this uses the same backoff logic on retry now. On Jan. 23, 2015, 8:57 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1195 https://reviews.apache.org/r/27799/diff/6/?file=831493#file831493line1195 Timeout? Need to eventually allow things like close() requests to process. Agreed, I'm going to lump that in with the other infinite loop issues. On Jan. 23, 2015, 8:57 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1526 https://reviews.apache.org/r/27799/diff/6/?file=831493#file831493line1526 I'm confused what's going on here -- why Integer.MIN_VALUE? And how does this end up working with the rest of the code since this result node is used for consumerCoordinator and other code compares node IDs? Yeah this is actually a bit of a hack and I think with a little effort could be removed. We need to maintain a seperate connection for interaction with the co-ordinator so that important co-ordinator requests don't queue up behind slow fetch requests. But currently NetworkClient is a bit dumb and just maintains a single connection per node id. So giving a bogus node id just ensures another connection. I'll file another ticket on this. On Jan. 23, 2015, 8:57 a.m., Ewen Cheslack-Postava wrote: core/src/main/scala/kafka/tools/ConsumerPerformance.scala, line 21 https://reviews.apache.org/r/27799/diff/6/?file=831557#file831557line21 Lots of unused import cleanup here. Yeah I use Eclipse which doesn't show those for scala :-( - Jay --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/#review69355 --- On Jan. 23, 2015, 4:22 a.m., Jay Kreps wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/ --- (Updated Jan. 23, 2015, 4:22 a.m.) Review request for kafka. Bugs: KAFKA-1760 https://issues.apache.org/jira/browse/KAFKA-1760 Repository: kafka Description --- New consumer. Diffs - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 clients/src/main/java/org/apache/kafka/clients/ClientRequest.java d32c319d8ee4c46dad309ea54b136ea9798e2fd7 clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 8aece7e81a804b177a6f2c12e2dc6c89c1613262 clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/ConnectionState.java ab7e3220f9b76b92ef981d695299656f041ad5ed clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 397695568d3fd8e835d8f923a89b3b00c96d0ead clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java PRE-CREATION
[jira] [Comment Edited] (KAFKA-1655) Allow high performance SimpleConsumer use cases to still work with new Kafka 0.9 consumer APIs
[ https://issues.apache.org/jira/browse/KAFKA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14289744#comment-14289744 ] Jay Kreps edited comment on KAFKA-1655 at 1/23/15 7:03 PM: --- I believe this is handled in the new consumer API. Can you take a look at the APIs and see what you think. Essentially you would do something like {code} consumer.subscribe(topic, partition) consumer.seek(topic, partition, offset) records = consumer.poll(timeout) consumer.unsubscribe(topic, partition) {code} The subscribe/unsubscribe and seek is just an in-memory modification. was (Author: jkreps): I believe this is handled in the new consumer API. Can you take a look at the APIs and see what you think. Essentially you would do something like {code} consumer.subscribe(topic, partition) consumer.seek(topic, partition, offset) records = consumer.poll(timeout) consumer.unsubscribe(topic, partition) {code} The subscribe/unsubscribe is just an in-memory modification. Allow high performance SimpleConsumer use cases to still work with new Kafka 0.9 consumer APIs -- Key: KAFKA-1655 URL: https://issues.apache.org/jira/browse/KAFKA-1655 Project: Kafka Issue Type: New Feature Components: consumer Affects Versions: 0.9.0 Reporter: Valentin Hi guys, currently Kafka allows consumers to either chose the low level or the high level API, depending on the specific requirements of the consumer implementation. However, I was told that the current low level API (SimpleConsumer) will be deprecated once the new Kafka 0.9 consumer APIs are available. In this case it would be good, if we can ensure that the new API does offer some ways to get similar performance for use cases which perfectly fit the old SimpleConsumer API approach. Example Use Case: A high throughput HTTP API wrapper for consumer requests which gets HTTP REST calls to retrieve data for a specific set of topic partitions and offsets. Here the SimpleConsumer is perfect because it allows connection pooling in the HTTP API web application with one pool per existing kafka broker and the web application can handle the required metadata managment to know which pool to fetch a connection for, for each used topic partition. This means connections to Kafka brokers can remain open/pooled and connection/reconnection and metadata overhead is minimized. To achieve something similar with the new Kafka 0.9 consumer APIs, it would be good if it could: - provide a lowlevel call to connect to a specific broker and to read data from a topic+partition+offset OR - ensure that subscribe/unsubscribe calls are very cheap and can run without requiring any network traffic. If I subscribe to a topic partition for which the same broker is the leader as the last topic partition which was in use for this consumer API connection, then the consumer API implementation should recognize this and should not do any disconnects/reconnects and just reuse the existing connection to that kafka broker. Or put differently, it should be possible to do external metadata handling in the consumer API client and the client should be able to pool consumer API connections effectively by having one pool per Kafka broker. Greetings Valentin -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1760) Implement new consumer client
[ https://issues.apache.org/jira/browse/KAFKA-1760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14289987#comment-14289987 ] Jay Kreps commented on KAFKA-1760: -- Updated reviewboard https://reviews.apache.org/r/27799/diff/ against branch trunk Implement new consumer client - Key: KAFKA-1760 URL: https://issues.apache.org/jira/browse/KAFKA-1760 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Jay Kreps Assignee: Jay Kreps Fix For: 0.8.3 Attachments: KAFKA-1760.patch, KAFKA-1760_2015-01-11_16:57:15.patch, KAFKA-1760_2015-01-18_19:10:13.patch, KAFKA-1760_2015-01-21_08:42:20.patch, KAFKA-1760_2015-01-22_10:03:26.patch, KAFKA-1760_2015-01-22_20:21:56.patch, KAFKA-1760_2015-01-23_13:13:00.patch Implement a consumer client. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27799: New consumer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/#review69453 --- core/src/test/scala/integration/kafka/api/ConsumerTest.scala https://reviews.apache.org/r/27799/#comment114158 I tried this and I don't think that is correct. @Test was introduced in junit4. Adding @Test to methods in ConsumerTest which extends stuff based on junit3 and changing their name to something that doesn't begin with testXXX causes them not to run. So the annotations have no effect, but look like they should, which is what I think is confusing. Here is what I think we could agree on: let's get off junit3 entirely, I think the mixture of both is error prone. core/src/test/scala/integration/kafka/api/ConsumerTest.scala https://reviews.apache.org/r/27799/#comment114165 Yeah it might be good to consider doing a multithreaded test with multiple consumers and producers once we have the group management stuff. I guess the argument for why this would be different from the producer is that the consumer itself is single-threaded so there is no background work happening. The real weakness I felt in these tests is that they only cover graceful shutdown. Actual machine death would I think expose a ton of issues. But we can probably do this in the system tests... core/src/test/scala/integration/kafka/api/ConsumerTest.scala https://reviews.apache.org/r/27799/#comment114169 Can't you just run it in the ide and immediately debug and see what is happening? That also takes only a few secs whereas the full test run is slow Anyhow I'll fix this case but I think we have this issue in about 1000 places and fixing it is quite ugly (see the fixed code I'm about to post) and obscures what is going on. Also fixing it with a timeout is far worse since then you have a timing assumption that will randomly fail. - Jay Kreps On Jan. 23, 2015, 4:22 a.m., Jay Kreps wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/ --- (Updated Jan. 23, 2015, 4:22 a.m.) Review request for kafka. Bugs: KAFKA-1760 https://issues.apache.org/jira/browse/KAFKA-1760 Repository: kafka Description --- New consumer. Diffs - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 clients/src/main/java/org/apache/kafka/clients/ClientRequest.java d32c319d8ee4c46dad309ea54b136ea9798e2fd7 clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 8aece7e81a804b177a6f2c12e2dc6c89c1613262 clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/ConnectionState.java ab7e3220f9b76b92ef981d695299656f041ad5ed clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 397695568d3fd8e835d8f923a89b3b00c96d0ead clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java c0c636b3e1ba213033db6d23655032c9bbd5e378 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 57c1807ccba9f264186f83e91f37c34b959c8060 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 16af70a5de52cca786fdea147a6a639b7dc4a311 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 76efc216c9e6c3ab084461d792877092a189ad0f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java fa88ac1a8b19b4294f211c4467fe68c7707ddbae clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java ea423ad15eebd262d20d5ec05d592cc115229177 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
Re: Review Request 27799: New consumer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/ --- (Updated Jan. 23, 2015, 9:15 p.m.) Review request for kafka. Bugs: KAFKA-1760 https://issues.apache.org/jira/browse/KAFKA-1760 Repository: kafka Description (updated) --- New consumer. Addressed comments from Ewen and Guozhang. Rebased. Diffs - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 clients/src/main/java/org/apache/kafka/clients/ClientRequest.java d32c319d8ee4c46dad309ea54b136ea9798e2fd7 clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 8aece7e81a804b177a6f2c12e2dc6c89c1613262 clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/ConnectionState.java ab7e3220f9b76b92ef981d695299656f041ad5ed clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 397695568d3fd8e835d8f923a89b3b00c96d0ead clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 752a979ea0b8bde7ff6d2e1a23bf54052305d841 clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java c0c636b3e1ba213033db6d23655032c9bbd5e378 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 57c1807ccba9f264186f83e91f37c34b959c8060 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 16af70a5de52cca786fdea147a6a639b7dc4a311 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 76efc216c9e6c3ab084461d792877092a189ad0f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java fa88ac1a8b19b4294f211c4467fe68c7707ddbae clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java ea423ad15eebd262d20d5ec05d592cc115229177 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java fc71710dd5997576d3841a1c3b0f7e19a8c9698e clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 904976fadf0610982958628eaee810b60a98d725 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java 483899d2e69b33655d0e08949f5f64af2519660a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ccc03d8447ebba40131a70e16969686ac4aab58a clients/src/main/java/org/apache/kafka/common/Cluster.java d3299b944062d96852452de455902659ad8af757 clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b15aa2c3ef2d7c4b24618ff42fd4da324237a813 clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec clients/src/main/java/org/apache/kafka/common/errors/ApiException.java 7c948b166a8ac07616809f260754116ae7764973 clients/src/main/java/org/apache/kafka/common/network/Selectable.java b68bbf00ab8eba6c5867d346c91188142593ca6e clients/src/main/java/org/apache/kafka/common/network/Selector.java 74d695ba39de44b6a3d15340ec0114bc4fce2ba2 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 3316b6a1098311b8603a4a5893bf57b75d2e43cb clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/record/LogEntry.java e4d688cbe0c61b74ea15fc8dd3b634f9e5ee9b83 clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java 99b52c23d639df010bf2affc0f79d1c6e16ed67c clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java 8b8f591c4b2802a9cbbe34746c0b3ca4a64a8681
[jira] [Commented] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2
[ https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290011#comment-14290011 ] Joel Koshy commented on KAFKA-1729: --- Ideally yes because it makes Java access unwieldy but not a blocker because I think these are pre-existing even in 0.8.1 and there are work-arounds. Here is a brief summary: * https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala The OffsetCommitResponse returns a map of [TopicAndPartition, (scala)Short]. JavaConversions apparently does not convert the individual values in the map to a Java short. E.g., if you change the signature of errors() in OffsetCommitResponse to return Map[TopicAndPartition, java.lang.Short] it won't compile. The correct fix I think is to do something similar to javaapi.FetchResponse - where we provide an explicit lookup method errorCode(topicAndPartition) and a hasError boolean method. Anyway, the workaround is to just fetch the object from the map and cast it to short. So (for example): (java.lang.Short) errors.get(topicAndPartition) * A lot of the requests don't provide default for fields such as clientId, correlationId and even version. Well there are defaults but the defaults are inaccessible from Java. We should ideally provide alternate constructors. * It would be useful to add readFrom wrapper methods in the javaapi versions. Otherwise if you want to read from a blocking channel you would need to do something like this: new kafka.javaapi.ConsumerMetadataResponse(kafka.api.ConsumerMetadataResponse.readFrom(channel.receive().buffer())); add doc for Kafka-based offset management in 0.8.2 -- Key: KAFKA-1729 URL: https://issues.apache.org/jira/browse/KAFKA-1729 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Joel Koshy Fix For: 0.8.2 Attachments: KAFKA-1782-doc-v1.patch, KAFKA-1782-doc-v2.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
This is a reminder that the deadline for the vote is this Monday, Jan 26, 7pm PT. Thanks, Jun On Wed, Jan 21, 2015 at 8:28 AM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.0. There has been some changes since the 0.8.2 beta release, especially in the new java producer api and jmx mbean names. It would be great if people can test this out thoroughly. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Jan 26h, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c (commit 0b312a6b9f0833d38eec434bfff4c647c1814564) /*** Thanks, Jun
[jira] [Commented] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2
[ https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290013#comment-14290013 ] Joel Koshy commented on KAFKA-1729: --- So perhaps it is best to leave the code as is since we anyway want to move completely over to the protocol definitions in the clients package. add doc for Kafka-based offset management in 0.8.2 -- Key: KAFKA-1729 URL: https://issues.apache.org/jira/browse/KAFKA-1729 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Joel Koshy Fix For: 0.8.2 Attachments: KAFKA-1782-doc-v1.patch, KAFKA-1782-doc-v2.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27799: New consumer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/ --- (Updated Jan. 23, 2015, 9:15 p.m.) Review request for kafka. Changes --- Addressed comments from Ewen and Guozhang. Rebased. Bugs: KAFKA-1760 https://issues.apache.org/jira/browse/KAFKA-1760 Repository: kafka Description (updated) --- New consumer. Diffs - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 clients/src/main/java/org/apache/kafka/clients/ClientRequest.java d32c319d8ee4c46dad309ea54b136ea9798e2fd7 clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 8aece7e81a804b177a6f2c12e2dc6c89c1613262 clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/ConnectionState.java ab7e3220f9b76b92ef981d695299656f041ad5ed clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 397695568d3fd8e835d8f923a89b3b00c96d0ead clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 752a979ea0b8bde7ff6d2e1a23bf54052305d841 clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java c0c636b3e1ba213033db6d23655032c9bbd5e378 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 57c1807ccba9f264186f83e91f37c34b959c8060 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 16af70a5de52cca786fdea147a6a639b7dc4a311 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 76efc216c9e6c3ab084461d792877092a189ad0f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java fa88ac1a8b19b4294f211c4467fe68c7707ddbae clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java ea423ad15eebd262d20d5ec05d592cc115229177 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java fc71710dd5997576d3841a1c3b0f7e19a8c9698e clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 904976fadf0610982958628eaee810b60a98d725 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java 483899d2e69b33655d0e08949f5f64af2519660a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ccc03d8447ebba40131a70e16969686ac4aab58a clients/src/main/java/org/apache/kafka/common/Cluster.java d3299b944062d96852452de455902659ad8af757 clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b15aa2c3ef2d7c4b24618ff42fd4da324237a813 clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec clients/src/main/java/org/apache/kafka/common/errors/ApiException.java 7c948b166a8ac07616809f260754116ae7764973 clients/src/main/java/org/apache/kafka/common/network/Selectable.java b68bbf00ab8eba6c5867d346c91188142593ca6e clients/src/main/java/org/apache/kafka/common/network/Selector.java 74d695ba39de44b6a3d15340ec0114bc4fce2ba2 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 3316b6a1098311b8603a4a5893bf57b75d2e43cb clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/record/LogEntry.java e4d688cbe0c61b74ea15fc8dd3b634f9e5ee9b83 clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java 99b52c23d639df010bf2affc0f79d1c6e16ed67c clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java 8b8f591c4b2802a9cbbe34746c0b3ca4a64a8681
Re: Review Request 27799: New consumer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/ --- (Updated Jan. 23, 2015, 9:13 p.m.) Review request for kafka. Bugs: KAFKA-1760 https://issues.apache.org/jira/browse/KAFKA-1760 Repository: kafka Description --- New consumer. Diffs (updated) - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 clients/src/main/java/org/apache/kafka/clients/ClientRequest.java d32c319d8ee4c46dad309ea54b136ea9798e2fd7 clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 8aece7e81a804b177a6f2c12e2dc6c89c1613262 clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/ConnectionState.java ab7e3220f9b76b92ef981d695299656f041ad5ed clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 397695568d3fd8e835d8f923a89b3b00c96d0ead clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 752a979ea0b8bde7ff6d2e1a23bf54052305d841 clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java c0c636b3e1ba213033db6d23655032c9bbd5e378 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 57c1807ccba9f264186f83e91f37c34b959c8060 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 16af70a5de52cca786fdea147a6a639b7dc4a311 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 76efc216c9e6c3ab084461d792877092a189ad0f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java fa88ac1a8b19b4294f211c4467fe68c7707ddbae clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java ea423ad15eebd262d20d5ec05d592cc115229177 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java fc71710dd5997576d3841a1c3b0f7e19a8c9698e clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 904976fadf0610982958628eaee810b60a98d725 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java 483899d2e69b33655d0e08949f5f64af2519660a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ccc03d8447ebba40131a70e16969686ac4aab58a clients/src/main/java/org/apache/kafka/common/Cluster.java d3299b944062d96852452de455902659ad8af757 clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b15aa2c3ef2d7c4b24618ff42fd4da324237a813 clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec clients/src/main/java/org/apache/kafka/common/errors/ApiException.java 7c948b166a8ac07616809f260754116ae7764973 clients/src/main/java/org/apache/kafka/common/network/Selectable.java b68bbf00ab8eba6c5867d346c91188142593ca6e clients/src/main/java/org/apache/kafka/common/network/Selector.java 74d695ba39de44b6a3d15340ec0114bc4fce2ba2 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 3316b6a1098311b8603a4a5893bf57b75d2e43cb clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/record/LogEntry.java e4d688cbe0c61b74ea15fc8dd3b634f9e5ee9b83 clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java 99b52c23d639df010bf2affc0f79d1c6e16ed67c clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java 8b8f591c4b2802a9cbbe34746c0b3ca4a64a8681 clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
[jira] [Commented] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2
[ https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290132#comment-14290132 ] Jun Rao commented on KAFKA-1729: [~jjkoshy], thanks for the update. Yes, I agree that we don't need to patch 0.8.2 for that. Going forward, we probably should avoid changing what's in SimpleConsumer since it will be replaced by the new consumer soon. add doc for Kafka-based offset management in 0.8.2 -- Key: KAFKA-1729 URL: https://issues.apache.org/jira/browse/KAFKA-1729 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Joel Koshy Fix For: 0.8.2 Attachments: KAFKA-1782-doc-v1.patch, KAFKA-1782-doc-v2.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1760) Implement new consumer client
[ https://issues.apache.org/jira/browse/KAFKA-1760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1760: - Attachment: KAFKA-1760_2015-01-23_13:13:00.patch Implement new consumer client - Key: KAFKA-1760 URL: https://issues.apache.org/jira/browse/KAFKA-1760 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Jay Kreps Assignee: Jay Kreps Fix For: 0.8.3 Attachments: KAFKA-1760.patch, KAFKA-1760_2015-01-11_16:57:15.patch, KAFKA-1760_2015-01-18_19:10:13.patch, KAFKA-1760_2015-01-21_08:42:20.patch, KAFKA-1760_2015-01-22_10:03:26.patch, KAFKA-1760_2015-01-22_20:21:56.patch, KAFKA-1760_2015-01-23_13:13:00.patch Implement a consumer client. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1896) Record size funcition of record in mirror maker hit NPE when the message value is null.
Jiangjie Qin created KAFKA-1896: --- Summary: Record size funcition of record in mirror maker hit NPE when the message value is null. Key: KAFKA-1896 URL: https://issues.apache.org/jira/browse/KAFKA-1896 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin The byte bounded queue should not assume the message value is not null. For compacted topics, the tombstone message will have a null value, which lead to an NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1896) Record size funcition of record in mirror maker hit NPE when the message value is null.
[ https://issues.apache.org/jira/browse/KAFKA-1896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1896: Status: Patch Available (was: Open) Record size funcition of record in mirror maker hit NPE when the message value is null. --- Key: KAFKA-1896 URL: https://issues.apache.org/jira/browse/KAFKA-1896 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1896.patch The byte bounded queue should not assume the message value is not null. For compacted topics, the tombstone message will have a null value, which lead to an NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1896) Record size funcition of record in mirror maker hit NPE when the message value is null.
[ https://issues.apache.org/jira/browse/KAFKA-1896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1896: Attachment: KAFKA-1896.patch Record size funcition of record in mirror maker hit NPE when the message value is null. --- Key: KAFKA-1896 URL: https://issues.apache.org/jira/browse/KAFKA-1896 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1896.patch The byte bounded queue should not assume the message value is not null. For compacted topics, the tombstone message will have a null value, which lead to an NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1896) Record size funcition of record in mirror maker hit NPE when the message value is null.
[ https://issues.apache.org/jira/browse/KAFKA-1896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290130#comment-14290130 ] Jiangjie Qin commented on KAFKA-1896: - Created reviewboard https://reviews.apache.org/r/30231/diff/ against branch origin/trunk Record size funcition of record in mirror maker hit NPE when the message value is null. --- Key: KAFKA-1896 URL: https://issues.apache.org/jira/browse/KAFKA-1896 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1896.patch The byte bounded queue should not assume the message value is not null. For compacted topics, the tombstone message will have a null value, which lead to an NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 30231: Patch for KAFKA-1896
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30231/ --- Review request for kafka. Bugs: KAFKA-1896 https://issues.apache.org/jira/browse/KAFKA-1896 Repository: kafka Description --- patch for KAFKA-1896 Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala 3ee84ed9cbde5d882cdc7f630b0deca7dd00fc73 Diff: https://reviews.apache.org/r/30231/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290241#comment-14290241 ] Guozhang Wang commented on KAFKA-1634: -- Updated reviewboard https://reviews.apache.org/r/27391/diff/ against branch origin/trunk Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-1634.patch, KAFKA-1634_2014-11-06_15:35:46.patch, KAFKA-1634_2014-11-07_16:54:33.patch, KAFKA-1634_2014-11-17_17:42:44.patch, KAFKA-1634_2014-11-21_14:00:34.patch, KAFKA-1634_2014-12-01_11:44:35.patch, KAFKA-1634_2014-12-01_18:03:12.patch, KAFKA-1634_2015-01-14_15:50:15.patch, KAFKA-1634_2015-01-21_16:43:01.patch, KAFKA-1634_2015-01-22_18:47:37.patch, KAFKA-1634_2015-01-23_16:06:07.patch From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 24, 2015, 12:06 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Incorporated Jun's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1634: - Attachment: KAFKA-1634_2015-01-23_16:06:07.patch Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-1634.patch, KAFKA-1634_2014-11-06_15:35:46.patch, KAFKA-1634_2014-11-07_16:54:33.patch, KAFKA-1634_2014-11-17_17:42:44.patch, KAFKA-1634_2014-11-21_14:00:34.patch, KAFKA-1634_2014-12-01_11:44:35.patch, KAFKA-1634_2014-12-01_18:03:12.patch, KAFKA-1634_2015-01-14_15:50:15.patch, KAFKA-1634_2015-01-21_16:43:01.patch, KAFKA-1634_2015-01-22_18:47:37.patch, KAFKA-1634_2015-01-23_16:06:07.patch From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1896) Record size funcition of record in mirror maker hit NPE when the message value is null.
[ https://issues.apache.org/jira/browse/KAFKA-1896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290383#comment-14290383 ] Guozhang Wang commented on KAFKA-1896: -- Thanks for the patch, +1 and commit to trunk. Record size funcition of record in mirror maker hit NPE when the message value is null. --- Key: KAFKA-1896 URL: https://issues.apache.org/jira/browse/KAFKA-1896 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1896.patch The byte bounded queue should not assume the message value is not null. For compacted topics, the tombstone message will have a null value, which lead to an NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1896) Record size funcition of record in mirror maker hit NPE when the message value is null.
[ https://issues.apache.org/jira/browse/KAFKA-1896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1896: - Fix Version/s: 0.8.3 Record size funcition of record in mirror maker hit NPE when the message value is null. --- Key: KAFKA-1896 URL: https://issues.apache.org/jira/browse/KAFKA-1896 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1896.patch The byte bounded queue should not assume the message value is not null. For compacted topics, the tombstone message will have a null value, which lead to an NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Jenkins build is back to normal : Kafka-trunk #379
See https://builds.apache.org/job/Kafka-trunk/379/changes
Re: Review Request 30231: Patch for KAFKA-1896
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30231/#review69525 --- Ship it! Ship It! - Guozhang Wang On Jan. 23, 2015, 10:30 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30231/ --- (Updated Jan. 23, 2015, 10:30 p.m.) Review request for kafka. Bugs: KAFKA-1896 https://issues.apache.org/jira/browse/KAFKA-1896 Repository: kafka Description --- patch for KAFKA-1896 Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala 3ee84ed9cbde5d882cdc7f630b0deca7dd00fc73 Diff: https://reviews.apache.org/r/30231/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 30158: Patch for KAFKA-1835
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30158/#review69515 --- clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/30158/#comment114212 I would - for-loop to call Metadata.add(topic). this way we add all topics to Metadata - call Metadata#requestUpdate() to tigger Sender thread to request update for all listed topics clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/30158/#comment114215 I would not use initialized flag, as long as we fix KafkaProducer#waitOnMetadata to allow value 0 for non-blocking. - Steven Wu On Jan. 22, 2015, 7:04 a.m., Paul Pearcy wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30158/ --- (Updated Jan. 22, 2015, 7:04 a.m.) Review request for kafka. Bugs: KAFKA-1835 https://issues.apache.org/jira/browse/KAFKA-1835 Repository: kafka Description --- KAFKA-1835 - New producer updates to make blocking behavior explicit Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java fc71710dd5997576d3841a1c3b0f7e19a8c9698e clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 core/src/test/scala/integration/kafka/api/ProducerBlockingTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala ac15d34425795d5be20c51b01fa1108bdcd66583 Diff: https://reviews.apache.org/r/30158/diff/ Testing --- Thanks, Paul Pearcy
Re: [DISCUSS] KIP-5 - Broker Configuration Management
Cool. Yeah sorry to nag about these KIPs, and I hope it doesn't come across the wrong way. But the hope I really have for these is that they are complete enough that even highly involved users can see and understand the change, motivation, etc. I think that will do a ton to help extend the community. -Jay On Thu, Jan 22, 2015 at 10:22 PM, Joe Stein joe.st...@stealth.ly wrote: There is still some to-dos to be done in https://reviews.apache.org/r/29513/diff/ to use changing to ConfigDef https://reviews.apache.org/r/30126/diff/ once that is in. We can get more written up on it, will do. On Fri, Jan 23, 2015 at 12:05 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, Can you fill in this KIP? The purpose of these KIPs is to give a full overview of the feature, how it will work, be implemented, the considerations involved, etc. There is only like one sentence on this which isn't enough for anyone to know what you are thinking. Moving off of configs to something that I'm guessing would be Zookeeper-based (?) is a massive change so we really need to describe this in a way that can be widely circulated. I actually think this would be a good idea. But there are a ton of advantages to good old fashioned text files in terms of config management and change control. And trying to support both may or may not be better. -Jay On Wed, Jan 21, 2015 at 10:34 PM, Joe Stein joe.st...@stealth.ly wrote: Created a KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-5+-+Broker+Configuration+Management JIRA https://issues.apache.org/jira/browse/KAFKA-1786 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
I think that before we start making more changes to Mirror Maker there should be unit tests in place for it. Currently Mirror Maker is broker on trunk (there is a patch to fix it) because of a recent change. That is only going to get more unwieldily as more change happens. On Wed, Jan 21, 2015 at 8:29 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, Thanks for comments. Please see inline responses. Jiangjie (Becket) Qin On 1/21/15, 1:33 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, A couple questions/comments: 1. The callback and user-controlled commit offset functionality is already in the new consumer which we are working on in parallel. If we accelerated that work it might help concentrate efforts. I admit this might take slightly longer in calendar time but could still probably get done this quarter. Have you guys considered that approach? Yes, I totally agree that ideally we should put efforts on new consumer. The main reason for still working on the old consumer is that we expect it would still be used in LinkedIn for quite a while before the new consumer could be fully rolled out. And we recently suffering a lot from mirror maker data loss issue. So our current plan is making necessary changes to make current mirror maker stable in production. Then we can test and rollout new consumer gradually without getting burnt. 2. I think partitioning on the hash of the topic partition is not a very good idea because that will make the case of going from a cluster with fewer partitions to one with more partitions not work. I think an intuitive way to do this would be the following: a. Default behavior: Just do what the producer does. I.e. if you specify a key use it for partitioning, if not just partition in a round-robin fashion. b. Add a --preserve-partition option that will explicitly inherent the partition from the source irrespective of whether there is a key or which partition that key would hash to. Sorry that I did not explain this clear enough. The hash of topic partition is only used when decide which mirror maker data channel queue the consumer thread should put message into. It only tries to make sure the messages from the same partition is sent by the same producer thread to guarantee the sending order. This is not at all related to which partition in target cluster the messages end up. That is still decided by producer. 3. You don't actually give the ConsumerRebalanceListener interface. What is that going to look like? Good point! I should have put it in the wiki. I just added it. 4. What is MirrorMakerRecord? I think ideally the MirrorMakerMessageHandler interface would take a ConsumerRecord as input and return a ProducerRecord, right? That would allow you to transform the key, value, partition, or destination topic... MirrorMakerRecord is introduced in KAFKA-1650, which is exactly the same as ConsumerRecord in KAFKA-1760. private[kafka] class MirrorMakerRecord (val sourceTopic: String, val sourcePartition: Int, val sourceOffset: Long, val key: Array[Byte], val value: Array[Byte]) { def size = value.length + {if (key == null) 0 else key.length} } However, because source partition and offset is needed in producer thread for consumer offsets bookkeeping, the record returned by MirrorMakerMessageHandler needs to contain those information. Therefore ProducerRecord does not work here. We could probably let message handler take ConsumerRecord for both input and output. 5. Have you guys thought about what the implementation will look like in terms of threading architecture etc with the new consumer? That will be soon so even if we aren't starting with that let's make sure we can get rid of a lot of the current mirror maker accidental complexity in terms of threads and queues when we move to that. I haven¹t thought about it throughly. The quick idea is after migration to the new consumer, it is probably better to use a single consumer thread. If multithread is needed, decoupling consumption and processing might be used. MirrorMaker definitely needs to be changed after new consumer get checked in. I¹ll document the changes and can submit follow up patches after the new consumer is available. -Jay On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Kafka Devs, We are working on Kafka Mirror Maker enhancement. A KIP is posted to document and discuss on the followings: 1. KAFKA-1650: No Data loss mirror maker change 2. KAFKA-1839: To allow partition aware mirror. 3. KAFKA-1840: To allow message filtering/format conversion Feedbacks are welcome. Please let us know if you have any questions or concerns. Thanks. Jiangjie (Becket) Qin
[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior
[ https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290398#comment-14290398 ] Guozhang Wang commented on KAFKA-1461: -- [~sriharsha] Sorry for the late reply. This fix looks good to me overall, except that we cannot potentially add partitions back only in the handlePartitionsWithErrors() call, since it will only be triggered when the next error happens. We can probably move this piece of code to processPartitionData(). Another way to do this could be: 1. Make the partitionMap in AbstractFetcherThread of a map from TopicAndPartition to OffsetAndState, where OffsetAndState contains the Offset (Long) and the State (active, inactive-with-delay). For simplicity we can just use Int here, and active would be 0, inactive would be the delay time. 2. Adding another function called delayPartitions in AbstractFetcherThread, which set State to inactive with the delay time. 3. In AbstractFetcherThread doWork() only include partitions with State 0 to send the fetch request, and also update the state values for non-zero partitions. Replica fetcher thread does not implement any back-off behavior --- Key: KAFKA-1461 URL: https://issues.apache.org/jira/browse/KAFKA-1461 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.1.1 Reporter: Sam Meder Assignee: Sriharsha Chintalapani Labels: newbie++ Fix For: 0.8.3 The current replica fetcher thread will retry in a tight loop if any error occurs during the fetch call. For example, we've seen cases where the fetch continuously throws a connection refused exception leading to several replica fetcher threads that spin in a pretty tight loop. To a much lesser degree this is also an issue in the consumer fetcher thread, although the fact that erroring partitions are removed so a leader can be re-discovered helps some. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1786) implement a global configuration feature for brokers
[ https://issues.apache.org/jira/browse/KAFKA-1786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrii Biletskyi updated KAFKA-1786: Attachment: KAFKA_1786.patch Patch KAFKA_1786.patch is based on KAFKA-1845 patch (KafkaConfig should use ConfigDef) implement a global configuration feature for brokers Key: KAFKA-1786 URL: https://issues.apache.org/jira/browse/KAFKA-1786 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Andrii Biletskyi Fix For: 0.8.3 Attachments: KAFKA_1786.patch Global level configurations (much like topic level) for brokers are managed by humans and automation systems through server.properties. Some configuration make sense to use default (like it is now) or override from central location (zookeeper for now). We can modify this through the new CLI tool so that every broker can have exact same setting. Some configurations we should allow to be overriden from server.properties (like port) but others we should use the global store as source of truth (e.g. auto topic enable, fetch replica message size, etc). Since most configuration I believe are going to fall into this category we should have the list of server.properties that can override the global config in the code in a list which we can manage... everything else the global takes precedence. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1786) implement a global configuration feature for brokers
[ https://issues.apache.org/jira/browse/KAFKA-1786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14262248#comment-14262248 ] Andrii Biletskyi edited comment on KAFKA-1786 at 1/23/15 12:05 PM: --- Created reviewboard https://reviews.apache.org/r/29513/diff/ against branch origin/trunk UPD: discraded for now. Uploaded a patch KAFAK_1786.patch based on KAFKA-1845 (since it's a separate big patch) was (Author: abiletskyi): Created reviewboard https://reviews.apache.org/r/29513/diff/ against branch origin/trunk implement a global configuration feature for brokers Key: KAFKA-1786 URL: https://issues.apache.org/jira/browse/KAFKA-1786 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Andrii Biletskyi Fix For: 0.8.3 Attachments: KAFKA_1786.patch Global level configurations (much like topic level) for brokers are managed by humans and automation systems through server.properties. Some configuration make sense to use default (like it is now) or override from central location (zookeeper for now). We can modify this through the new CLI tool so that every broker can have exact same setting. Some configurations we should allow to be overriden from server.properties (like port) but others we should use the global store as source of truth (e.g. auto topic enable, fetch replica message size, etc). Since most configuration I believe are going to fall into this category we should have the list of server.properties that can override the global config in the code in a list which we can manage... everything else the global takes precedence. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1853) Unsuccessful suffix rename of expired LogSegment can leak open files and also leave the LogSegment in an invalid state
[ https://issues.apache.org/jira/browse/KAFKA-1853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290442#comment-14290442 ] jaikiran pai commented on KAFKA-1853: - Updated reviewboard https://reviews.apache.org/r/29755/diff/ against branch origin/trunk Unsuccessful suffix rename of expired LogSegment can leak open files and also leave the LogSegment in an invalid state -- Key: KAFKA-1853 URL: https://issues.apache.org/jira/browse/KAFKA-1853 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: jaikiran pai Fix For: 0.8.3 Attachments: KAFKA-1853_2015-01-20_22:04:29.patch, KAFKA-1853_2015-01-24_10:48:08.patch As noted in this discussion in the user mailing list http://mail-archives.apache.org/mod_mbox/kafka-users/201501.mbox/%3C54AE3661.8080007%40gmail.com%3E an unsuccessful attempt at renaming the underlying files of a LogSegment can lead to file leaks and also leave the LogSegment in an invalid state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1853) Unsuccessful suffix rename of expired LogSegment can leak open files and also leave the LogSegment in an invalid state
[ https://issues.apache.org/jira/browse/KAFKA-1853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jaikiran pai updated KAFKA-1853: Attachment: KAFKA-1853_2015-01-24_10:48:08.patch Unsuccessful suffix rename of expired LogSegment can leak open files and also leave the LogSegment in an invalid state -- Key: KAFKA-1853 URL: https://issues.apache.org/jira/browse/KAFKA-1853 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: jaikiran pai Fix For: 0.8.3 Attachments: KAFKA-1853_2015-01-20_22:04:29.patch, KAFKA-1853_2015-01-24_10:48:08.patch As noted in this discussion in the user mailing list http://mail-archives.apache.org/mod_mbox/kafka-users/201501.mbox/%3C54AE3661.8080007%40gmail.com%3E an unsuccessful attempt at renaming the underlying files of a LogSegment can lead to file leaks and also leave the LogSegment in an invalid state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29755: Patch for KAFKA-1853
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29755/ --- (Updated Jan. 24, 2015, 5:18 a.m.) Review request for kafka. Bugs: KAFKA-1853 https://issues.apache.org/jira/browse/KAFKA-1853 Repository: kafka Description --- KAFKA-1853 Prevent leaking open file resources when renaming of the LogSegment fails Diffs (updated) - core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 core/src/main/scala/kafka/log/Log.scala 846023bb98d0fa0603016466360c97071ac935ea core/src/main/scala/kafka/log/OffsetIndex.scala 1c4c7bd89e19ea942cf1d01eafe502129e97f535 Diff: https://reviews.apache.org/r/29755/diff/ Testing --- Have run the existing tests in LogManagerTest (which includes a test for cleaning of expired LogSegments) and those have passed with this change. I did give a thought of trying to replicate a failed rename scenario and then to ensure that we don't leak resources anymore, but that's not straightforward to do in the tests, so haven't added any new tests. Thanks, Jaikiran Pai
Re: Review Request 29755: Patch for KAFKA-1853
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29755/#review69529 --- core/src/main/scala/kafka/log/Log.scala https://reviews.apache.org/r/29755/#comment114225 Seems like in the case of the exception we actually forcefully delete, but then after deleting schedule another one. Should the scheduler action be inside the try block so it only occurs if the rename succeeds? - Jay Kreps On Jan. 24, 2015, 5:18 a.m., Jaikiran Pai wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29755/ --- (Updated Jan. 24, 2015, 5:18 a.m.) Review request for kafka. Bugs: KAFKA-1853 https://issues.apache.org/jira/browse/KAFKA-1853 Repository: kafka Description --- KAFKA-1853 Prevent leaking open file resources when renaming of the LogSegment fails Diffs - core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 core/src/main/scala/kafka/log/Log.scala 846023bb98d0fa0603016466360c97071ac935ea core/src/main/scala/kafka/log/OffsetIndex.scala 1c4c7bd89e19ea942cf1d01eafe502129e97f535 Diff: https://reviews.apache.org/r/29755/diff/ Testing --- Have run the existing tests in LogManagerTest (which includes a test for cleaning of expired LogSegments) and those have passed with this change. I did give a thought of trying to replicate a failed rename scenario and then to ensure that we don't leak resources anymore, but that's not straightforward to do in the tests, so haven't added any new tests. Thanks, Jaikiran Pai
Re: Review Request 29755: Patch for KAFKA-1853
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29755/ --- (Updated Jan. 24, 2015, 5:51 a.m.) Review request for kafka. Bugs: KAFKA-1853 https://issues.apache.org/jira/browse/KAFKA-1853 Repository: kafka Description --- KAFKA-1853 Prevent leaking open file resources when renaming of the LogSegment fails Diffs (updated) - core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 core/src/main/scala/kafka/log/Log.scala 846023bb98d0fa0603016466360c97071ac935ea core/src/main/scala/kafka/log/OffsetIndex.scala 1c4c7bd89e19ea942cf1d01eafe502129e97f535 Diff: https://reviews.apache.org/r/29755/diff/ Testing --- Have run the existing tests in LogManagerTest (which includes a test for cleaning of expired LogSegments) and those have passed with this change. I did give a thought of trying to replicate a failed rename scenario and then to ensure that we don't leak resources anymore, but that's not straightforward to do in the tests, so haven't added any new tests. Thanks, Jaikiran Pai
Re: Review Request 30078: Patch for KAFKA-1885
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30078/#review69532 --- Ship it! Ship It! - Manikumar Reddy O On Jan. 24, 2015, 5:13 a.m., Jaikiran Pai wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30078/ --- (Updated Jan. 24, 2015, 5:13 a.m.) Review request for kafka. Bugs: KAFKA-1885 https://issues.apache.org/jira/browse/KAFKA-1885 Repository: kafka Description --- KAFKA-1885 Upgrade junit dependency in core to 4.6 version to allow running individual test methods via gradle command line Diffs - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 Diff: https://reviews.apache.org/r/30078/diff/ Testing --- Tested that existing support to run an entire individual test case from the command line works as advertised: ```./gradlew -Dtest.single=ProducerFailureHandlingTest core:test kafka.api.test.ProducerFailureHandlingTest testTooLargeRecordWithAckZero PASSED kafka.api.test.ProducerFailureHandlingTest testTooLargeRecordWithAckOne PASSED kafka.api.test.ProducerFailureHandlingTest testNonExistentTopic PASSED kafka.api.test.ProducerFailureHandlingTest testWrongBrokerList PASSED kafka.api.test.ProducerFailureHandlingTest testNoResponse PASSED kafka.api.test.ProducerFailureHandlingTest testInvalidPartition PASSED kafka.api.test.ProducerFailureHandlingTest testSendAfterClosed PASSED kafka.api.test.ProducerFailureHandlingTest testBrokerFailure PASSED kafka.api.test.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED kafka.api.test.ProducerFailureHandlingTest testNotEnoughReplicas PASSED kafka.api.test.ProducerFailureHandlingTest testNotEnoughReplicasAfterBrokerShutdown PASSED BUILD SUCCESSFUL ``` Also tested that with this change it is now possible to run individual test methods as follows: ``` ./gradlew clients:test --tests org.apache.kafka.clients.producer.MetadataTest.testMetadataUpdateWaitTime org.apache.kafka.clients.producer.MetadataTest testMetadataUpdateWaitTime PASSED ``` ``` ./gradlew core:test --tests kafka.api.test.ProducerFailureHandlingTest.testCannotSendToInternalTopic kafka.api.test.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED ``` Thanks, Jaikiran Pai
[jira] [Created] (KAFKA-1898) compatibility testing framework
Joe Stein created KAFKA-1898: Summary: compatibility testing framework Key: KAFKA-1898 URL: https://issues.apache.org/jira/browse/KAFKA-1898 Project: Kafka Issue Type: Bug Reporter: Joe Stein Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
QQ: If we ever use a different technique for the data channel selection than for the producer partitioning won't that break ordering? How can we ensure these things stay in sync? With respect to the new consumer--I really do want to encourage people to think through how MM will work with the new consumer. I mean this isn't very far off, maybe a few months if we hustle? I could imagine us getting this mm fix done maybe sooner, maybe in a month? So I guess this buys us an extra month before we rip it out and throw it away? Maybe two? This bug has been there for a while, though, right? Is it worth it? Probably it is, but it still kind of sucks to have the duplicate effort. So anyhow let's definitely think about how things will work with the new consumer. I think we can probably just have N threads, each thread has a producer and consumer and is internally single threaded. Any reason this wouldn't work? -Jay On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, Thanks for comments. Please see inline responses. Jiangjie (Becket) Qin On 1/21/15, 1:33 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, A couple questions/comments: 1. The callback and user-controlled commit offset functionality is already in the new consumer which we are working on in parallel. If we accelerated that work it might help concentrate efforts. I admit this might take slightly longer in calendar time but could still probably get done this quarter. Have you guys considered that approach? Yes, I totally agree that ideally we should put efforts on new consumer. The main reason for still working on the old consumer is that we expect it would still be used in LinkedIn for quite a while before the new consumer could be fully rolled out. And we recently suffering a lot from mirror maker data loss issue. So our current plan is making necessary changes to make current mirror maker stable in production. Then we can test and rollout new consumer gradually without getting burnt. 2. I think partitioning on the hash of the topic partition is not a very good idea because that will make the case of going from a cluster with fewer partitions to one with more partitions not work. I think an intuitive way to do this would be the following: a. Default behavior: Just do what the producer does. I.e. if you specify a key use it for partitioning, if not just partition in a round-robin fashion. b. Add a --preserve-partition option that will explicitly inherent the partition from the source irrespective of whether there is a key or which partition that key would hash to. Sorry that I did not explain this clear enough. The hash of topic partition is only used when decide which mirror maker data channel queue the consumer thread should put message into. It only tries to make sure the messages from the same partition is sent by the same producer thread to guarantee the sending order. This is not at all related to which partition in target cluster the messages end up. That is still decided by producer. 3. You don't actually give the ConsumerRebalanceListener interface. What is that going to look like? Good point! I should have put it in the wiki. I just added it. 4. What is MirrorMakerRecord? I think ideally the MirrorMakerMessageHandler interface would take a ConsumerRecord as input and return a ProducerRecord, right? That would allow you to transform the key, value, partition, or destination topic... MirrorMakerRecord is introduced in KAFKA-1650, which is exactly the same as ConsumerRecord in KAFKA-1760. private[kafka] class MirrorMakerRecord (val sourceTopic: String, val sourcePartition: Int, val sourceOffset: Long, val key: Array[Byte], val value: Array[Byte]) { def size = value.length + {if (key == null) 0 else key.length} } However, because source partition and offset is needed in producer thread for consumer offsets bookkeeping, the record returned by MirrorMakerMessageHandler needs to contain those information. Therefore ProducerRecord does not work here. We could probably let message handler take ConsumerRecord for both input and output. 5. Have you guys thought about what the implementation will look like in terms of threading architecture etc with the new consumer? That will be soon so even if we aren't starting with that let's make sure we can get rid of a lot of the current mirror maker accidental complexity in terms of threads and queues when we move to that. I haven¹t thought about it throughly. The quick idea is after migration to the new consumer, it is probably better to use a single consumer thread. If multithread is needed, decoupling consumption and processing might be used. MirrorMaker definitely needs to be changed after new consumer get checked in. I¹ll document the changes and can submit follow up patches after the new consumer is available. -Jay On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin
Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
I don't think so--see if you buy my explanation. We previously defaulted to the byte array serializer and it was a source of unending frustration and confusion. Since it wasn't a required config people just went along plugging in whatever objects they had, and thinking that changing the parametric types would somehow help. Then they would get a class case exception and assume our stuff was somehow busted, not realizing we had helpfully configured a type different from what they were passing in under the covers. So I think it is actually good for people to think: how am I serializing my data, and getting that exception will make them ask that question right? -Jay On Fri, Jan 23, 2015 at 9:06 PM, Joe Stein joe.st...@stealth.ly wrote: Should value.serializer in the new java producer be defaulted to Array[Byte] ? I was working on testing some upgrade paths and got this ! return exception in callback when buffer cannot accept message ConfigException: Missing required configuration value.serializer which has no default value. (ConfigDef.java:124) org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) org.apache.kafka.common.config.AbstractConfig.init(AbstractConfig.java:48) org.apache.kafka.clients.producer.ProducerConfig.init(ProducerConfig.java:235) org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:129) ly.stealth.testing.BaseSpec$class.createNewKafkaProducer(BaseSpec.scala:42) ly.stealth.testing.KafkaSpec.createNewKafkaProducer(KafkaSpec.scala:36) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:175) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:170) On Fri, Jan 23, 2015 at 5:55 PM, Jun Rao j...@confluent.io wrote: This is a reminder that the deadline for the vote is this Monday, Jan 26, 7pm PT. Thanks, Jun On Wed, Jan 21, 2015 at 8:28 AM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.0. There has been some changes since the 0.8.2 beta release, especially in the new java producer api and jmx mbean names. It would be great if people can test this out thoroughly. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Jan 26h, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c (commit 0b312a6b9f0833d38eec434bfff4c647c1814564) /*** Thanks, Jun -- You received this message because you are subscribed to the Google Groups kafka-clients group. To unsubscribe from this group and stop receiving emails from it, send an email to kafka-clients+unsubscr...@googlegroups.com. To post to this group, send email to kafka-clie...@googlegroups.com. Visit this group at http://groups.google.com/group/kafka-clients. To view this discussion on the web visit https://groups.google.com/d/msgid/kafka-clients/CAFc58G83a%3DsvoKkkB3476kpbcQ8p0Fob6vtJYj9CgxMACvvEEQ%40mail.gmail.com https://groups.google.com/d/msgid/kafka-clients/CAFc58G83a%3DsvoKkkB3476kpbcQ8p0Fob6vtJYj9CgxMACvvEEQ%40mail.gmail.com?utm_medium=emailutm_source=footer . For more options, visit https://groups.google.com/d/optout.
[jira] [Updated] (KAFKA-1885) Allow test methods in core to be individually run from outside of the IDE
[ https://issues.apache.org/jira/browse/KAFKA-1885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jaikiran pai updated KAFKA-1885: Attachment: KAFKA-1885_2015-01-24_10:42:46.patch Allow test methods in core to be individually run from outside of the IDE --- Key: KAFKA-1885 URL: https://issues.apache.org/jira/browse/KAFKA-1885 Project: Kafka Issue Type: Improvement Components: system tests Reporter: jaikiran pai Assignee: jaikiran pai Attachments: KAFKA-1885.patch, KAFKA-1885_2015-01-24_10:42:46.patch Gradle in combination with Java plugin allows test filtering which lets users run select test classes or even select test methods from the command line. See Test filtering section here http://www.gradle.org/docs/2.0/userguide/java_plugin.html#sec:java_test which has examples of the commands. Currently we have this working in the clients and I can run something like: {code} ./gradlew clients:test --tests org.apache.kafka.clients.producer.MetadataTest.testMetadataUpdateWaitTime {code} and that command then only runs that specific test method (testMetadataUpdateWaitTime) from the MetadataTest class. {code} To honour the JVM settings for this build a new JVM will be forked. Please consider using the daemon: http://gradle.org/docs/2.0/userguide/gradle_daemon.html. Building project 'core' with Scala version 2.10.4 :clients:compileJava UP-TO-DATE :clients:processResources UP-TO-DATE :clients:classes UP-TO-DATE :clients:compileTestJava UP-TO-DATE :clients:processTestResources UP-TO-DATE :clients:testClasses UP-TO-DATE :clients:test org.apache.kafka.clients.producer.MetadataTest testMetadataUpdateWaitTime PASSED BUILD SUCCESSFUL Total time: 12.714 secs {code} I've found this useful when I need to do some quick tests and also reproduce issues that aren't noticed sometimes if the whole test class is run. This currently only works for the clients and not for core --because the core doesn't have the Java plugin applied to it in the gradle build--. I've a patch which does that (and one other related thing) which then allowed me to run individual test methods even for the core tests. I will create a review request for it. Edit: I was wrong about the java plugin not being applied to core. It is indeed already applied but my attempt to get test methods running individually for core were failing for a different reason related to JUnit version dependency. I'll be addressing that in the patch and uploading for review. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 30078: Patch for KAFKA-1885
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30078/ --- (Updated Jan. 24, 2015, 5:13 a.m.) Review request for kafka. Bugs: KAFKA-1885 https://issues.apache.org/jira/browse/KAFKA-1885 Repository: kafka Description --- KAFKA-1885 Upgrade junit dependency in core to 4.6 version to allow running individual test methods via gradle command line Diffs (updated) - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 Diff: https://reviews.apache.org/r/30078/diff/ Testing --- Tested that existing support to run an entire individual test case from the command line works as advertised: ```./gradlew -Dtest.single=ProducerFailureHandlingTest core:test kafka.api.test.ProducerFailureHandlingTest testTooLargeRecordWithAckZero PASSED kafka.api.test.ProducerFailureHandlingTest testTooLargeRecordWithAckOne PASSED kafka.api.test.ProducerFailureHandlingTest testNonExistentTopic PASSED kafka.api.test.ProducerFailureHandlingTest testWrongBrokerList PASSED kafka.api.test.ProducerFailureHandlingTest testNoResponse PASSED kafka.api.test.ProducerFailureHandlingTest testInvalidPartition PASSED kafka.api.test.ProducerFailureHandlingTest testSendAfterClosed PASSED kafka.api.test.ProducerFailureHandlingTest testBrokerFailure PASSED kafka.api.test.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED kafka.api.test.ProducerFailureHandlingTest testNotEnoughReplicas PASSED kafka.api.test.ProducerFailureHandlingTest testNotEnoughReplicasAfterBrokerShutdown PASSED BUILD SUCCESSFUL ``` Also tested that with this change it is now possible to run individual test methods as follows: ``` ./gradlew clients:test --tests org.apache.kafka.clients.producer.MetadataTest.testMetadataUpdateWaitTime org.apache.kafka.clients.producer.MetadataTest testMetadataUpdateWaitTime PASSED ``` ``` ./gradlew core:test --tests kafka.api.test.ProducerFailureHandlingTest.testCannotSendToInternalTopic kafka.api.test.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED ``` Thanks, Jaikiran Pai
[jira] [Commented] (KAFKA-1869) Openning some random ports while running kafka service
[ https://issues.apache.org/jira/browse/KAFKA-1869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290445#comment-14290445 ] jaikiran pai commented on KAFKA-1869: - [~huqian] As [~omkreddy] noted, that ephemeral port that's being used for the JMX communication is more of an implementation detail of the JRMP protocol. It appears to be a well known problem and there seem to be some workaround to it (like usage of Java agent), but I think that's something that you'll have to write up as a custom application specific thing. On a different note, I think if you have questions related to this, it might be a good idea to open a discussion in the Kafka user mailing list, details of which are here http://kafka.apache.org/contact.html Openning some random ports while running kafka service --- Key: KAFKA-1869 URL: https://issues.apache.org/jira/browse/KAFKA-1869 Project: Kafka Issue Type: Bug Environment: kafka_2.9.2-0.8.1.1 Reporter: QianHu Assignee: Manikumar Reddy Fix For: 0.8.2 while running kafka service , four random ports have been opened . In which , and 9092 are setted by myself , but 28538 and 16650 are opened randomly . Can you help me that why this random ports will be opened , and how can we give them constant values ? Thank you very much . [work@02 kafka]$ jps 8400 Jps 727 Kafka [work@02 kafka]$ netstat -tpln|grep 727 (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp0 0 0.0.0.0:0.0.0.0:* LISTEN 727/./bin/../jdk1.7 tcp0 0 0.0.0.0:28538 0.0.0.0:* LISTEN 727/./bin/../jdk1.7 tcp0 0 0.0.0.0:90920.0.0.0:* LISTEN 727/./bin/../jdk1.7 tcp0 0 0.0.0.0:16650 0.0.0.0:* LISTEN 727/./bin/../jdk1.7 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Cannot stop Kafka server if zookeeper is shutdown first
I was just playing around with the RC2 of 0.8.2 and noticed that if I shutdown zookeeper first I can't shutdown Kafka server at all since it goes into a never ending attempt to reconnect with zookeeper. I had to kill the Kafka process to stop it. I tried it against trunk too and there too I see the same issue. Should I file a JIRA for this and see if I can come up with a patch? FWIW, here's the unending (and IMO too frequent) attempts at trying to reconnect. I've a thread dump too which shows that the other thread which is trying to complete a controlled shutdown of Kafka is blocked forever for the zookeeper to be up. I can attach it to the JIRA. 2015-01-24 10:15:46,278] WARN Session 0x14b1a413680 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) [2015-01-24 10:15:47,437] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2015-01-24 10:15:47,438] WARN Session 0x14b1a413680 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) [2015-01-24 10:15:49,056] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2015-01-24 10:15:49,057] WARN Session 0x14b1a413680 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) [2015-01-24 10:15:50,801] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2015-01-24 10:15:50,802] WARN Session 0x14b1a413680 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) -Jaikiran
Re: Review Request 29755: Patch for KAFKA-1853
On Jan. 24, 2015, 5:43 a.m., Jay Kreps wrote: core/src/main/scala/kafka/log/Log.scala, line 751 https://reviews.apache.org/r/29755/diff/4/?file=833256#file833256line751 Seems like in the case of the exception we actually forcefully delete, but then after deleting schedule another one. Should the scheduler action be inside the try block so it only occurs if the rename succeeds? Sorry, I missed a return statement in the catch block. Fixed it now and updated the patch. Thanks Jay! - Jaikiran --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29755/#review69529 --- On Jan. 24, 2015, 5:51 a.m., Jaikiran Pai wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29755/ --- (Updated Jan. 24, 2015, 5:51 a.m.) Review request for kafka. Bugs: KAFKA-1853 https://issues.apache.org/jira/browse/KAFKA-1853 Repository: kafka Description --- KAFKA-1853 Prevent leaking open file resources when renaming of the LogSegment fails Diffs - core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 core/src/main/scala/kafka/log/Log.scala 846023bb98d0fa0603016466360c97071ac935ea core/src/main/scala/kafka/log/OffsetIndex.scala 1c4c7bd89e19ea942cf1d01eafe502129e97f535 Diff: https://reviews.apache.org/r/29755/diff/ Testing --- Have run the existing tests in LogManagerTest (which includes a test for cleaning of expired LogSegments) and those have passed with this change. I did give a thought of trying to replicate a failed rename scenario and then to ensure that we don't leak resources anymore, but that's not straightforward to do in the tests, so haven't added any new tests. Thanks, Jaikiran Pai
[jira] [Commented] (KAFKA-1853) Unsuccessful suffix rename of expired LogSegment can leak open files and also leave the LogSegment in an invalid state
[ https://issues.apache.org/jira/browse/KAFKA-1853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290454#comment-14290454 ] jaikiran pai commented on KAFKA-1853: - Updated reviewboard https://reviews.apache.org/r/29755/diff/ against branch origin/trunk Unsuccessful suffix rename of expired LogSegment can leak open files and also leave the LogSegment in an invalid state -- Key: KAFKA-1853 URL: https://issues.apache.org/jira/browse/KAFKA-1853 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: jaikiran pai Fix For: 0.8.3 Attachments: KAFKA-1853_2015-01-20_22:04:29.patch, KAFKA-1853_2015-01-24_10:48:08.patch, KAFKA-1853_2015-01-24_11:21:07.patch As noted in this discussion in the user mailing list http://mail-archives.apache.org/mod_mbox/kafka-users/201501.mbox/%3C54AE3661.8080007%40gmail.com%3E an unsuccessful attempt at renaming the underlying files of a LogSegment can lead to file leaks and also leave the LogSegment in an invalid state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 30078: Patch for KAFKA-1885
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30078/#review69533 --- Ship it! Non-Binding +1 - Manikumar Reddy O On Jan. 24, 2015, 5:13 a.m., Jaikiran Pai wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30078/ --- (Updated Jan. 24, 2015, 5:13 a.m.) Review request for kafka. Bugs: KAFKA-1885 https://issues.apache.org/jira/browse/KAFKA-1885 Repository: kafka Description --- KAFKA-1885 Upgrade junit dependency in core to 4.6 version to allow running individual test methods via gradle command line Diffs - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 Diff: https://reviews.apache.org/r/30078/diff/ Testing --- Tested that existing support to run an entire individual test case from the command line works as advertised: ```./gradlew -Dtest.single=ProducerFailureHandlingTest core:test kafka.api.test.ProducerFailureHandlingTest testTooLargeRecordWithAckZero PASSED kafka.api.test.ProducerFailureHandlingTest testTooLargeRecordWithAckOne PASSED kafka.api.test.ProducerFailureHandlingTest testNonExistentTopic PASSED kafka.api.test.ProducerFailureHandlingTest testWrongBrokerList PASSED kafka.api.test.ProducerFailureHandlingTest testNoResponse PASSED kafka.api.test.ProducerFailureHandlingTest testInvalidPartition PASSED kafka.api.test.ProducerFailureHandlingTest testSendAfterClosed PASSED kafka.api.test.ProducerFailureHandlingTest testBrokerFailure PASSED kafka.api.test.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED kafka.api.test.ProducerFailureHandlingTest testNotEnoughReplicas PASSED kafka.api.test.ProducerFailureHandlingTest testNotEnoughReplicasAfterBrokerShutdown PASSED BUILD SUCCESSFUL ``` Also tested that with this change it is now possible to run individual test methods as follows: ``` ./gradlew clients:test --tests org.apache.kafka.clients.producer.MetadataTest.testMetadataUpdateWaitTime org.apache.kafka.clients.producer.MetadataTest testMetadataUpdateWaitTime PASSED ``` ``` ./gradlew core:test --tests kafka.api.test.ProducerFailureHandlingTest.testCannotSendToInternalTopic kafka.api.test.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED ``` Thanks, Jaikiran Pai
Re: Review Request 27799: New consumer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/#review69355 --- clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java https://reviews.apache.org/r/27799/#comment114034 This should probably just return the ConnectionState since that's all it's used for. In fact, NodeConnectionState could be made a private nested class since ClusterConnectionStates is the only class that uses it. clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java https://reviews.apache.org/r/27799/#comment114036 METADATA_MAX_AGE_CONFIG/DOC are still duplicated in the ProducerConfig and ConsumerConfig clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/27799/#comment114037 Parameter now is not used, should be removed. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/27799/#comment114038 I think these methods need to have timeouts on them. They get called via synchronized methods in KafkaConsumer and KafkaConsumer.close() is also synchronized, so an attempt to shutdown the consumer could be blocked indefinitely by a call to completeAll(). clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/27799/#comment114039 These are just for debugging, but there's a return statement earlier where the selected node won't be logged. clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java https://reviews.apache.org/r/27799/#comment114040 Using ConcatenatedIterable here too would require less copying -- just build up a list of iterators rather than copying all the entries into the ArrayList clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/27799/#comment114043 Does this give the right metric names? Looks different than the one in KafkaProducer (which doesn't have a trailing dot. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/27799/#comment114047 Don't need to call time.milliseconds twice here clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/27799/#comment114050 Won't this cause busy looping during network failures (and maybe due to other errors returned in the OffsetCommitResponse)? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/27799/#comment114055 Timeout? Need to eventually allow things like close() requests to process. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/27799/#comment114063 A bunch of redundant time.milliseconds() calls in this method. A couple are necessary with the loops, but a lot could be removed. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/27799/#comment114062 I'm confused what's going on here -- why Integer.MIN_VALUE? And how does this end up working with the rest of the code since this result node is used for consumerCoordinator and other code compares node IDs? core/src/main/scala/kafka/tools/ConsumerPerformance.scala https://reviews.apache.org/r/27799/#comment114069 Lots of unused import cleanup here. - Ewen Cheslack-Postava On Jan. 23, 2015, 4:22 a.m., Jay Kreps wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/ --- (Updated Jan. 23, 2015, 4:22 a.m.) Review request for kafka. Bugs: KAFKA-1760 https://issues.apache.org/jira/browse/KAFKA-1760 Repository: kafka Description --- New consumer. Diffs - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 clients/src/main/java/org/apache/kafka/clients/ClientRequest.java d32c319d8ee4c46dad309ea54b136ea9798e2fd7 clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 8aece7e81a804b177a6f2c12e2dc6c89c1613262 clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/ConnectionState.java ab7e3220f9b76b92ef981d695299656f041ad5ed clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 397695568d3fd8e835d8f923a89b3b00c96d0ead clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java PRE-CREATION
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-1476: Attachment: sample-kafka-consumer-groups-sh-output-1-23-2015.txt Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: Balaji Seshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch, KAFKA-1476_2015-01-12_16:22:26.patch, KAFKA-1476_2015-01-12_16:31:20.patch, KAFKA-1476_2015-01-13_10:36:18.patch, KAFKA-1476_2015-01-15_14:30:04.patch, KAFKA-1476_2015-01-22_02:32:52.patch, sample-kafka-consumer-groups-sh-output-1-23-2015.txt, sample-kafka-consumer-groups-sh-output.txt It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29831: Patch for KAFKA-1476
On Jan. 23, 2015, 2:22 a.m., Neha Narkhede wrote: Onur, do you have an updated version of the console output from this tool? Hi Neha. I just uploaded it now. - Onur --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/#review69332 --- On Jan. 22, 2015, 10:32 a.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29831/ --- (Updated Jan. 22, 2015, 10:32 a.m.) Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- Merged in work for KAFKA-1476 and sub-task KAFKA-1826 Diffs - bin/kafka-consumer-groups.sh PRE-CREATION core/src/main/scala/kafka/admin/AdminUtils.scala 28b12c7b89a56c113b665fbde1b95f873f8624a3 core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala c14bd455b6642f5e6eb254670bef9f57ae41d6cb core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala ac15d34425795d5be20c51b01fa1108bdcd66583 Diff: https://reviews.apache.org/r/29831/diff/ Testing --- Thanks, Onur Karaman
[jira] [Updated] (KAFKA-1786) implement a global configuration feature for brokers
[ https://issues.apache.org/jira/browse/KAFKA-1786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrii Biletskyi updated KAFKA-1786: Attachment: (was: KAFKA-1786.patch) implement a global configuration feature for brokers Key: KAFKA-1786 URL: https://issues.apache.org/jira/browse/KAFKA-1786 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Andrii Biletskyi Fix For: 0.8.3 Global level configurations (much like topic level) for brokers are managed by humans and automation systems through server.properties. Some configuration make sense to use default (like it is now) or override from central location (zookeeper for now). We can modify this through the new CLI tool so that every broker can have exact same setting. Some configurations we should allow to be overriden from server.properties (like port) but others we should use the global store as source of truth (e.g. auto topic enable, fetch replica message size, etc). Since most configuration I believe are going to fall into this category we should have the list of server.properties that can override the global config in the code in a list which we can manage... everything else the global takes precedence. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14289447#comment-14289447 ] Sriharsha Chintalapani commented on KAFKA-1646: --- [~jkreps] [~junrao] [~nehanarkhede] pinging for a review. Thanks. Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIPs
Wouldn't it make sense to move away from these rich binary broker descriptors ({ host, port, proto }) (which require protocol churning on change), and simply use URIs instead? E.g.: kafka://host[:port]/ -- cleantext proto on standard port 9092 kafkas://host[:port] -- SSL enveloped proto on standard port 9093 kafkas://user:pass@host[:port]/ -- SSL enveloped, with user authentication .. kafkafuturetech://.../#opts -- six months from now. Trailing #fragment_ids could be used to hint the client on protocol versions, supported authentications, etc. This also makes error reporting more meaningful on the client, e.g compare: Unsupported protocol 19 on broker foo:1234 to Unsupported protocol kafkafturetech on broker foo:1234 A positive side effect would be a more generalized topic addressing in clients: kafkacat kafka://bootstrap/mytopic/3?offset=end -- tail partition 3 of mytopic Just an idea, Magnus 2015-01-23 5:43 GMT+01:00 Jun Rao j...@confluent.io: Reviewed the latest patch in KAFKA-1809 :). Thanks, Jun On Thu, Jan 22, 2015 at 12:38 PM, Gwen Shapira gshap...@cloudera.com wrote: Thanks for validating our ideas. Updated the KIP with the workflow. Now if you can nudge Jun to review the latest patch... ;) On Thu, Jan 22, 2015 at 11:44 AM, Jay Kreps j...@confluent.io wrote: Oh yeah I think that is better, I hadn't thought of that approach! Any way you could describe the usage in the KIP, just for completeness? -Jay On Thu, Jan 22, 2015 at 10:23 AM, Gwen Shapira gshap...@cloudera.com wrote: I think what you described was the original design, so no wonder you are confused :) Following suggestions from Jun, I changed it a bit. The current model is: - Clients (producers and consumers) need to know about the broker ports in advance. They don't need to know about all brokers, but they need to know at least one host:port pair that speaks the protocol they want to use. The change is that all host:port pairs in broker.list must be of the same protocol and match the security.protocol configuration parameter. - Client uses security.protocol configuration parameter to open a connection to one of the brokers and sends the good old MetadataRequest. The broker knows which port it got the connection on, therefore it knows which security protocol is expected (it needs to use the same protocol to accept the connection and respond), and therefore it can send a response that contains only the host:port pairs that are relevant to that protocol. - From the client side the MetadataResponse did not change - it contains a list of brokerId,host,port that the client can connect to. The fact that all those broker endpoints were chosen out of a larger collection to match the right protocol is irrelevant for the client. I really like the new design since it preserves a lot of the same configurations and APIs. Thoughts? Gwen On Thu, Jan 22, 2015 at 9:19 AM, Jay Kreps jay.kr...@gmail.com wrote: I think I am still confused. In addition to the UpdateMetadataRequest don't we have to change the MetadataResponse so that it's possible for clients to discover the new ports? Or is that a second phase? I was imagining it worked by basically allowing the brokers to advertise multiple ports, one per security type, and then in the client you configure a protocol which will implicitly choose the port from the options returned in metadata to you... Likewise in the ConsumerMetadataResponse we are currently giving back full broker information. I think we would have two options here: either change the broker information included in that response to match the metadataresponse or else remove the broker information entirely and just return the node id (since in order to use that request you would already have to have the cluster metadata). The second option may be cleaner since it means we won't have to continue evolving those two in lockstep... -Jay On Wed, Jan 21, 2015 at 6:19 PM, Gwen Shapira gshap...@cloudera.com wrote: Good point :) I added the specifics of the new UpdateMetadataRequest, which is the only protocol bump in this change. Highlighted the broker and producer/consumer configuration changes, added some example values and added the new zookeeper json. Hope this makes things clearer. On Wed, Jan 21, 2015 at 2:19 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Gwen, Could we get the actual changes in that KIP? I.e. changes to metadata request, changes to UpdateMetadataRequest, new configs and what will their valid values be, etc. This kind of says that those things will change but doesn't say what they will change to... -Jay On Mon, Jan 19, 2015 at