Re: Review Request 33065: Patch for KAFKA-1928
On May 1, 2015, 9:25 p.m., Jay Kreps wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 382 https://reviews.apache.org/r/33065/diff/3/?file=946988#file946988line382 Beautiful! So much deleted code! Haha :) Yeah, I love how minimal SocketServer became. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review82281 --- On May 1, 2015, 10:45 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 1, 2015, 10:45 p.m.) Review request for kafka. Bugs: KAFKA-1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Following Jun's comments - moved MultiSend to client. Cleaned up destinations as well Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java e55ab11df4db0b0084f841a74cbcf819caf780d5 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 129ae827bccbd982ad93d56e46c6f5c46f147fe0 clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java c8213e156ec9c9af49ee09f5238492318516aaa3 clients/src/main/java/org/apache/kafka/common/network/MultiSend.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java fc0d168324aaebb97065b0aafbd547a1994d76a7 clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 68327cd3a734fd429966d3e2016a2488dbbb19e5 clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a09e470166a1c33639cf0cab26a3bce98ec clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 27cbf390c7f148ffa8c5abc154c72cbf0829715c clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab13e3c02eb351558ec973b949b3d1196085 clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 8b278892883e63899b53e15efb9d8c926131e858 clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06a4c9e5bb351201299cd3037f5226f0e6c core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b3802ac221d570e7610458e50518b4499e7ed core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78adb760eaeb029466b54f335a29caf3b0f core/src/main/scala/kafka/api/ControlledShutdownRequest.scala fe81635c864cec03ca1d4681c9c47c3fc4f975ee core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c
[jira] [Updated] (KAFKA-2123) Make new consumer offset commit API use callback + future
[ https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2123: - Attachment: KAFKA-2123_2015-05-01_19:33:19.patch Make new consumer offset commit API use callback + future - Key: KAFKA-2123 URL: https://issues.apache.org/jira/browse/KAFKA-2123 Project: Kafka Issue Type: Improvement Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, KAFKA-2123_2015-05-01_19:33:19.patch The current version of the offset commit API in the new consumer is void commit(offsets, commit type) where the commit type is either sync or async. This means you need to use sync if you ever want confirmation that the commit succeeded. Some applications will want to use asynchronous offset commit, but be able to tell when the commit completes. This is basically the same problem that had to be fixed going from old consumer - new consumer and I'd suggest the same fix using a callback + future combination. The new API would be FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); where ConsumerCommitCallback contains a single method: public void onCompletion(Exception exception); We can provide shorthand variants of commit() for eliding the different arguments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33196: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33196/ --- (Updated May 2, 2015, 2:33 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description (updated) --- KAFKA-2123: Add queuing of offset commit requests. KAFKA-2123: Add scheduler for delayed tasks in new consumer, add backoff for commit retries, and simplify auto commit by using delayed tasks. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java bdff518b732105823058e6182f445248b45dc388 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java e55ab11df4db0b0084f841a74cbcf819caf780d5 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java PRE-CREATION core/src/test/scala/integration/kafka/api/ConsumerTest.scala ffbdf5dc106e2a59563768280074696c76491337 Diff: https://reviews.apache.org/r/33196/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14524185#comment-14524185 ] Steven Zhen Wu commented on KAFKA-2121: --- Updated reviewboard https://reviews.apache.org/r/33760/diff/ against branch apache/trunk prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Steven Zhen Wu Fix For: 0.8.3 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch, KAFKA-2121_2015-05-01_15:42:30.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer
Re: Review Request 33065: Patch for KAFKA-1928
On April 18, 2015, 12:49 a.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java, line 55 https://reviews.apache.org/r/33065/diff/1/?file=922619#file922619line55 Do you know why the return is changed from int to long? writeTo() and remaining() should agree on type, since remaining is calculated from sendSize minus a sum of writeTo() calls. We can either change writeTo() to int or remaining() to long. I believe either will work, and I chose to change remaining(). On April 18, 2015, 12:49 a.m., Jun Rao wrote: core/src/main/scala/kafka/api/FetchResponse.scala, lines 66-72 https://reviews.apache.org/r/33065/diff/1/?file=922627#file922627line66 I am wondering if we need both completed() and remaining() in Send. It seems that one of the two is enough for our usage. Also, not sure how useful reify() is. Currently, it's not used anywhere. reification is a very central concept in Kafka's The Trial :) I agree that reify() and remaining() can be removed without any pain. Since it is a public API, do we need to deprecate them first? On April 18, 2015, 12:49 a.m., Jun Rao wrote: core/src/main/scala/kafka/api/FetchResponse.scala, line 70 https://reviews.apache.org/r/33065/diff/1/?file=922627#file922627line70 This probably should be def reify() {}? errr... why? is it a coding standard issue? I think the above is correct code for reify as a function that returns null (which is what Send API specified if we don't have a buffer to return). Anyway, this is irrelevant if we are removing reify. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review80557 --- On May 1, 2015, 12:48 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 1, 2015, 12:48 a.m.) Review request for kafka. Bugs: KAFKA-1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java e55ab11df4db0b0084f841a74cbcf819caf780d5 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 129ae827bccbd982ad93d56e46c6f5c46f147fe0 clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java c8213e156ec9c9af49ee09f5238492318516aaa3 clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java fc0d168324aaebb97065b0aafbd547a1994d76a7 clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 68327cd3a734fd429966d3e2016a2488dbbb19e5 clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a09e470166a1c33639cf0cab26a3bce98ec
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121_2015-05-01_15:42:30.patch prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Steven Zhen Wu Fix For: 0.8.3 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch, KAFKA-2121_2015-05-01_15:42:30.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources.
Re: Review Request 33760: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/#review82292 --- Ship it! Ship It! - Guozhang Wang On May 1, 2015, 10:42 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/ --- (Updated May 1, 2015, 10:42 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. add a test for checking Serializer is closed during KafkaProducer#close missing copyright header in previous checkin remvoed throws Exception for test methods Diffs - clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 9a57579f87cb19cb6affe6d157ff8446c23e3551 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java eea2c28450736d1668c68828f77a49470a82c3d0 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 49f1427bcbe43c773920a25aa69a71d0329296b7 clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 6f948f240c906029a0f972bf770f288f390ea714 clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION Diff: https://reviews.apache.org/r/33760/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33760: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/ --- (Updated May 1, 2015, 10:42 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. add a test for checking Serializer is closed during KafkaProducer#close missing copyright header in previous checkin remvoed throws Exception for test methods Diffs (updated) - clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 9a57579f87cb19cb6affe6d157ff8446c23e3551 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java eea2c28450736d1668c68828f77a49470a82c3d0 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 49f1427bcbe43c773920a25aa69a71d0329296b7 clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 6f948f240c906029a0f972bf770f288f390ea714 clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION Diff: https://reviews.apache.org/r/33760/diff/ Testing --- Thanks, Steven Wu
[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network
[ https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1928: Attachment: KAFKA-1928_2015-05-01_15:45:24.patch Move kafka.network over to using the network classes in org.apache.kafka.common.network --- Key: KAFKA-1928 URL: https://issues.apache.org/jira/browse/KAFKA-1928 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Gwen Shapira Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch, KAFKA-1928_2015-04-30_17:48:33.patch, KAFKA-1928_2015-05-01_15:45:24.patch As part of the common package we introduced a bunch of network related code and abstractions. We should look into replacing a lot of what is in kafka.network with this code. Duplicate classes include things like Receive, Send, etc. It is likely possible to also refactor the SocketServer to make use of Selector which should significantly simplify it's code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network
[ https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14524195#comment-14524195 ] Gwen Shapira commented on KAFKA-1928: - Updated reviewboard https://reviews.apache.org/r/33065/diff/ against branch trunk Move kafka.network over to using the network classes in org.apache.kafka.common.network --- Key: KAFKA-1928 URL: https://issues.apache.org/jira/browse/KAFKA-1928 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Gwen Shapira Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch, KAFKA-1928_2015-04-30_17:48:33.patch, KAFKA-1928_2015-05-01_15:45:24.patch As part of the common package we introduced a bunch of network related code and abstractions. We should look into replacing a lot of what is in kafka.network with this code. Duplicate classes include things like Receive, Send, etc. It is likely possible to also refactor the SocketServer to make use of Selector which should significantly simplify it's code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33065: Patch for KAFKA-1928
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 1, 2015, 10:45 p.m.) Review request for kafka. Bugs: KAFKA-1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description (updated) --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Following Jun's comments - moved MultiSend to client. Cleaned up destinations as well Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java e55ab11df4db0b0084f841a74cbcf819caf780d5 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 129ae827bccbd982ad93d56e46c6f5c46f147fe0 clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java c8213e156ec9c9af49ee09f5238492318516aaa3 clients/src/main/java/org/apache/kafka/common/network/MultiSend.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java fc0d168324aaebb97065b0aafbd547a1994d76a7 clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 68327cd3a734fd429966d3e2016a2488dbbb19e5 clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a09e470166a1c33639cf0cab26a3bce98ec clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 27cbf390c7f148ffa8c5abc154c72cbf0829715c clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab13e3c02eb351558ec973b949b3d1196085 clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 8b278892883e63899b53e15efb9d8c926131e858 clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06a4c9e5bb351201299cd3037f5226f0e6c core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b3802ac221d570e7610458e50518b4499e7ed core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78adb760eaeb029466b54f335a29caf3b0f core/src/main/scala/kafka/api/ControlledShutdownRequest.scala fe81635c864cec03ca1d4681c9c47c3fc4f975ee core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 431190ab94afc4acfc14348a1fc720e17c071cea core/src/main/scala/kafka/api/OffsetCommitRequest.scala cf8e6acc426aef6eb19d862bf6a108a5fc37907a core/src/main/scala/kafka/api/OffsetFetchRequest.scala 67811a752a470bf9bdbc8c5419e8d6e20a006169 core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc7518ad76f9548772522751afb4d046b78 core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d core/src/main/scala/kafka/api/StopReplicaRequest.scala 5e14987c990fe561c01dac2909f5ed21a506e038 core/src/main/scala/kafka/api/TopicMetadataRequest.scala 363bae01752318f3849242b97a6619747697c1d9
[jira] [Commented] (KAFKA-2123) Make new consumer offset commit API use callback + future
[ https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14524491#comment-14524491 ] Ewen Cheslack-Postava commented on KAFKA-2123: -- Updated to add backoff back in upon retries, which was removed when I unified the sync and async processing. I ended up generalizing this a bit to provide for scheduling tasks that need to be delayed for some time. With the generalization, I could also move the automatic background offset commits to this system as well. This also fixed a bug in the existing implementation where a long poll might cause automatic offset commit to not run since we only checked before the poll if a commit was needed but didn't account for the need to stop polling and commit offsets if the commit should happen sometime during the poll() period. [~jkreps] and [~junrao] might want to take a look -- the producer code has gotten complicated because we have so many timeouts to coordinate, so computing the right poll timeouts has to take into account a bunch of variables. I think a centralized scheduler might help keep this complexity in check. Make new consumer offset commit API use callback + future - Key: KAFKA-2123 URL: https://issues.apache.org/jira/browse/KAFKA-2123 Project: Kafka Issue Type: Improvement Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, KAFKA-2123_2015-05-01_19:33:19.patch The current version of the offset commit API in the new consumer is void commit(offsets, commit type) where the commit type is either sync or async. This means you need to use sync if you ever want confirmation that the commit succeeded. Some applications will want to use asynchronous offset commit, but be able to tell when the commit completes. This is basically the same problem that had to be fixed going from old consumer - new consumer and I'd suggest the same fix using a callback + future combination. The new API would be FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); where ConsumerCommitCallback contains a single method: public void onCompletion(Exception exception); We can provide shorthand variants of commit() for eliding the different arguments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: Patch for KAFKA-2055: ConsumerBounceTest.testS...
GitHub user lvfangmin opened a pull request: https://github.com/apache/kafka/pull/60 Patch for KAFKA-2055: ConsumerBounceTest.testSeekAndCommitWithBrokerFail... ...ures transient failure You can merge this pull request into a Git repository by running: $ git pull https://github.com/lvfangmin/kafka KAFKA-2055 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/60.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #60 commit 5e174e3feabe03c14a6dca451fdf0967285069ff Author: lvfangmin lvfang...@gmail.com Date: 2015-04-27T14:40:35Z Patch for KAFKA-2055: ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-2055) ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14523248#comment-14523248 ] ASF GitHub Bot commented on KAFKA-2055: --- GitHub user lvfangmin opened a pull request: https://github.com/apache/kafka/pull/60 Patch for KAFKA-2055: ConsumerBounceTest.testSeekAndCommitWithBrokerFail... ...ures transient failure You can merge this pull request into a Git repository by running: $ git pull https://github.com/lvfangmin/kafka KAFKA-2055 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/60.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #60 commit 5e174e3feabe03c14a6dca451fdf0967285069ff Author: lvfangmin lvfang...@gmail.com Date: 2015-04-27T14:40:35Z Patch for KAFKA-2055: ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure Key: KAFKA-2055 URL: https://issues.apache.org/jira/browse/KAFKA-2055 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Fangmin Lv Labels: newbie Attachments: KAFKA-2055.patch {code} kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:976 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:913 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review
+1. Thanks Parth On 5/1/15, 12:38 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Also +1. There are some drawbacks to using Github for reviews, e.g. lots of emails for each review because they don't let you publish your entire review in one go like RB does, but it drastically lowers the barrier to contributing for most developers. Also, if you haven't tried it, hub https://hub.github.com/ makes it really easy to checkout and test PRs. One thing I noticed is that when you try to generate a PR it defaults to the 0.8.2 branch. Can we fix that up to be trunk by default? That's the most common use case; version branches are really only useful when a release is being prepared. Do changes to the Github repo require tickets to the Apache Infra team or is this something committers have control over? On a related note, which perhaps should be discussed on another thread: The CI setup is a related issue that we might want to rethink. Apache also supports Travis CI, and now pays for dedicated build slaves: https://blogs.apache.org/infra/entry/apache_gains_additional_travis_ci I think the setup should be pretty easy since building + running tests is just a gradle command. Having tests run automatically on PRs (and promptly!) makes it a lot easier to confidently commit a change, especially if the build merges with trunk first. I started looking at this when I was trying to sort out Confluent's build test infrastructure. See https://travis-ci.org/ewencp/kafka/builds/60802386 for an example, the patch is about 10 lines in a .travis.yml file and the failure in that example seems to be unrelated to the Travis confg. I think the basic config I created is all we'd need. Unfortunately, I couldn't easily tell what the delay on builds is, i.e. would it be an improvement over the delays with the current Jenkins setup. But having them run on PR creation/update means the results will usually be ready by the time a reviewer gets to looking at the PR and would be reported in the PR so the state is easy to evaluate. (I'm also having trouble telling exactly how the two ASF Jenkins builders differ since they both seem to poll trunk, so I can't be certain whether Travis would be able to completely replace the current setup. That said, it should be telling that I have never paid attention to Jenkins output at all since it seems so far removed from any of my actions as a contributor.) As another alternative, Confluent would also be happy to provide build/test infrastructure to help out. AMPLab does this for Spark, so it seems to be acceptable to ASF. We already have PR builders and trunk/master builders set up for other projects, so it wouldn't be hard to setup the same for Kafka with the right access to the repos. Based on the current frequency of builds (https://builds.apache.org/view/All/job/Kafka-trunk/ and https://builds.apache.org/view/All/job/KafkaPreCommit/) I think it'll be easy for even our current infrastructure to keep up. On Thu, Apr 30, 2015 at 9:41 PM, Jaikiran Pai jai.forums2...@gmail.com wrote: I think this will help a lot in contributions. Some of my local changes that I want to contribute back have been pending because I sometimes switch machines and I then have to go through setting up the Ruby/python and other stuff for the current review process. Using just github is going to help in quickly submitting the changes. -Jaikiran On Thursday 30 April 2015 06:42 PM, Ismael Juma wrote: Hi all, Kafka currently uses a combination of Review Board and JIRA for contributions and code review. In my opinion, this makes contribution and code review a bit harder than it has to be. I think the approach used by Spark would improve the current situation: Generally, Spark uses JIRA to track logical issues, including bugs and improvements, and uses Github pull requests to manage the review and merge of specific code changes. That is, JIRAs are used to describe what should be fixed or changed, and high-level approaches, and pull requests describe how to implement that change in the project's source code. For example, major design decisions are discussed in JIRA.[1] It's worth reading the wiki page for all the details, but I will summarise the suggested workflow for code changes: 1. Fork the Github repository at http://github.com/apache/kafka (if you haven't already) 2. git checkout -b kafka-XXX 3. Make one or more commits (smaller commits can be easier to review and reviewboard makes that hard) 4. git push origin kafka-XXX 5. Create PR against upstream/trunk (this will update JIRA automatically[2] and it will send an email to the dev mailing list too) 6. A CI build will be triggered[3] 7. Review process happens on GitHub (it's quite handy to be able to comment on both commit or PR-level, unlike Review Board) 8. Once all feedback has been addressed and the build is green, a variant of the `merge_spark_pr.py`[4] script is used
Review Request 33760: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/ --- Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. add a test for checking Serializer is closed during KafkaProducer#close Diffs - clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 9a57579f87cb19cb6affe6d157ff8446c23e3551 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java eea2c28450736d1668c68828f77a49470a82c3d0 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 49f1427bcbe43c773920a25aa69a71d0329296b7 clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 6f948f240c906029a0f972bf770f288f390ea714 clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION Diff: https://reviews.apache.org/r/33760/diff/ Testing --- Thanks, Steven Wu
RE: [DISCUSS] KIP-21 Configuration Management
Hey Gwen, Thanks for the feedback. As Joel said, these client configs do not introduce a producer/consumer zk dependency. It is configuration that is needed by the broker. From your comments, I gather that you are more worried about managing broker internal configs via Zookeeper since we already have a file. So why have two mechanisms? Given that we already manage topic specific configuration in ZK, it seems a good fit to at least have client configs there since these config parameters aren't really driven through a file anyway. It also maintains consistency. Even for broker configs, it seems very consistent to have all the overridden configs in one place which is easy to view and change. As you mentioned user's shouldn't ever have to fiddle with Zookeeper directly, our tooling should provide the ability to view and modify configs on a per-broker basis. I do like your suggestion of reloading config files but I'm not sure this works easily for everyone. For example, often these per-host overrides in config files are managed by hostname but what we really want are broker level overrides which means that it should ideally be tied to a broker-id which is a Kafka detail. In addition, sometimes these configs pushed to individual hosts aren't the properties files themselves.. rather some company specific stuff that also contains the Kafka configs. I guess the point I'm trying to make is that people may not be able to reload configs directly from file without doing some additional work in many cases. As far as propogating configuration changes, perhaps I can clarify this section a bit more. Also, we can also do a pass over all the configs in KafkaConfig and have a list of properties that can be converted slowly. Thanks, Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Thursday, April 30, 2015 5:14 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-21 Configuration Management 1. I have deep concerns about managing configuration in ZooKeeper. First, Producers and Consumers shouldn't depend on ZK at all, this seems to add back a dependency we are trying to get away from. The KIP probably needs to be clarified here - I don't think Aditya was referring to client (producer/consumer) configs. These are global client-id-specific configs that need to be managed centrally. (Specifically, quota overrides on a per-client basis).
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121.patch prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Steven Zhen Wu Fix For: 0.8.3 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14523538#comment-14523538 ] Steven Zhen Wu commented on KAFKA-2121: --- Created reviewboard https://reviews.apache.org/r/33760/diff/ against branch apache/trunk prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Steven Zhen Wu Fix For: 0.8.3 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method
Re: Review Request 33065: Patch for KAFKA-1928
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review82281 --- I did a high level pass. On the whole I think this is awesome! I agree we need to rationalize the metrics now that we have a bunch of instrumentation at the selector level we need a lot less at the socket server level. clients/src/main/java/org/apache/kafka/common/network/Selector.java https://reviews.apache.org/r/33065/#comment133003 Does this need to be public? Instead could we have maxIdleTime be a configurable param and have this call happen from within select? We didn't have this functionality in the client before, but actually the client also needs connection LRU so I think doing it on both sides is good. core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment133004 Beautiful! So much deleted code! - Jay Kreps On May 1, 2015, 12:48 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 1, 2015, 12:48 a.m.) Review request for kafka. Bugs: KAFKA-1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java e55ab11df4db0b0084f841a74cbcf819caf780d5 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 129ae827bccbd982ad93d56e46c6f5c46f147fe0 clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java c8213e156ec9c9af49ee09f5238492318516aaa3 clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java fc0d168324aaebb97065b0aafbd547a1994d76a7 clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 68327cd3a734fd429966d3e2016a2488dbbb19e5 clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a09e470166a1c33639cf0cab26a3bce98ec clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 27cbf390c7f148ffa8c5abc154c72cbf0829715c clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab13e3c02eb351558ec973b949b3d1196085 clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 8b278892883e63899b53e15efb9d8c926131e858 clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06a4c9e5bb351201299cd3037f5226f0e6c core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b3802ac221d570e7610458e50518b4499e7ed core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78adb760eaeb029466b54f335a29caf3b0f
Re: Review Request 33760: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/#review82280 --- Pointed out two minor issues, but otherwise LGTM. clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java https://reviews.apache.org/r/33760/#comment133001 Both of these test methods no longer need 'throws Exception'. clients/src/test/java/org/apache/kafka/test/MockSerializer.java https://reviews.apache.org/r/33760/#comment133000 Checkstyle is complaining about missing the copyright header. - Ewen Cheslack-Postava On May 1, 2015, 6 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/ --- (Updated May 1, 2015, 6 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. add a test for checking Serializer is closed during KafkaProducer#close Diffs - clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 9a57579f87cb19cb6affe6d157ff8446c23e3551 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java eea2c28450736d1668c68828f77a49470a82c3d0 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 49f1427bcbe43c773920a25aa69a71d0329296b7 clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 6f948f240c906029a0f972bf770f288f390ea714 clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION Diff: https://reviews.apache.org/r/33760/diff/ Testing --- Thanks, Steven Wu
Re: [DISCUSS] KIP-21 Configuration Management
Hi Aditya, thanks for the write up and focusing on this piece. Agreed we need something that we can do broker changes dynamically without rolling restarts. I think though if every broker is getting changes it with notifications it is going to limit which configs can be dynamic. We could never deliver a stop the world configuration change because then that would happen on the entire cluster to every broker on the same time. Can maybe just the controller get the notification? And we provide a layer for brokers to work with the controller to-do the config change operations at is discretion (so it can stop things if needs). controller gets notification, sends AdminChangeNotification to broker [X .. N] then brokers can do their things, even send a response for heartbeating while it takes the few milliseconds it needs or crashes. We need to go through both scenarios. I am worried we put this change in like this and it works for quotas and maybe a few other things but nothing else gets dynamic and we don't get far enough for almost no more rolling restarts. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Thu, Apr 30, 2015 at 8:14 PM, Joel Koshy jjkosh...@gmail.com wrote: 1. I have deep concerns about managing configuration in ZooKeeper. First, Producers and Consumers shouldn't depend on ZK at all, this seems to add back a dependency we are trying to get away from. The KIP probably needs to be clarified here - I don't think Aditya was referring to client (producer/consumer) configs. These are global client-id-specific configs that need to be managed centrally. (Specifically, quota overrides on a per-client basis).
Re: [DISCUSSION] java.io.Closeable in KAFKA-2121
If you use KafkaProducer as a Closable, you still need to catch the exception when calling close(), right? So the behavior is different whether you use it as a Producer or a Closable? Thanks, Jun On Thu, Apr 30, 2015 at 6:26 PM, Jay Kreps j...@confluent.io wrote: Hey Jun, I think the Closable interface is what we've used elsewhere and what the rest of the java world uses. I don't think it is too hard for us to add the override in our interface--implementors of the interface don't need to do it. -Jay On Thu, Apr 30, 2015 at 4:02 PM, Jun Rao j...@confluent.io wrote: That makes sense. Then, would it be better to have a KafkaClosable interface that doesn't throw exception? This way, we don't need to override close in every implementing class. Thanks, Jun On Wed, Apr 29, 2015 at 10:36 AM, Steven Wu stevenz...@gmail.com wrote: Jun, we still get the benefit of extending Closeable. e.g. Utils.closeQuietly() can take FooSerializer as an argument. we can avoid the duplication of boiler-plate code. class FooSerializer implements Serializer { @Override public void close() { // may throw unchecked RuntimeException } } final class Utils { public static void closeQuietly(Closeable c, String name, AtomicReferenceThrowable firstException) { if (c != null) { try { c.close(); } catch (Throwable t) { firstException.compareAndSet(null, t); log.error(Failed to close + name, t); } } } On Wed, Apr 29, 2015 at 6:51 AM, Jun Rao j...@confluent.io wrote: If you do this, the code is no longer simple, which defeats the benefit of extending Closeable. We can define our own Closeable that doesn't throw exceptions, but it may be confusing. So, it seems the original code is probably better. Thanks, Jun On Tue, Apr 28, 2015 at 3:11 PM, Steven Wu stevenz...@gmail.com wrote: sorry for the previous empty msg. Jay's idea should work. basically, we override the close method in Serializer interface. public interface SerializerT extends Closeable { @Override public void close(); } On Tue, Apr 28, 2015 at 1:10 PM, Steven Wu stevenz...@gmail.com wrote: On Tue, Apr 28, 2015 at 1:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Good point Jay. More specifically we were already implementing without the checked exception, we'd need to override close() in the Serializer and Deserializer interfaces and omit the throws clause. That definitely makes them source compatible. Not sure about binary compatibility, I couldn't find a quick answer but I think it's probably still compatible. -Ewen On Tue, Apr 28, 2015 at 12:30 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, You can implement Closable without the checked exception. Having close() methods throw checked exceptions isn't very useful unless there is a way for the caller to recover. In this case there really isn't, right? -Jay On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang wangg...@gmail.com wrote: Folks, In a recent commit I made regarding KAFKA-2121, there is an omitted API change which makes Serializer / Deserializer extending from Closeable, whose close() call could throw IOException by declaration. Hence now some scenario like: - SerializerT keySerializer = ... SerializerT valueSerializer = ... KafkaProducer producer = new KafkaProducer(config, keySerializer, valueSerializer) // ... keySerializer.close() valueSerializer.close() - will need to capture IOException now. Want to bring this up for people's attention, and you opinion on whether we should revert this change? -- Guozhang -- Thanks, Ewen
Re: [VOTE] KIP-11- Authorization design for kafka security
Suresh, We typically wrap up the voting of a KIP in a few days. However, given that this KIP is quite critical and there seems to be new questions, perhaps we can spend a bit more time to have people's concerns addressed and then resume the voting. Joe, Do you still have concerns given the previous replies? Thanks, Jun On Thu, Apr 30, 2015 at 7:54 PM, Suresh Srinivas sur...@hortonworks.com wrote: It is a strange choice to return does not exist when the condition is actually not authorized. I have hard time understanding why that is better for security. Perhaps in DB world this is expected and changes may be necessary to comply with such behavior. But that should not guide what we do in Kafka. This is a voting thread for an important feature. Security is the number one feature that our users are asking for. Can't minor things like this be done in a follow up jiras? Should the focus be brought back to voting? Btw since I am new to the Kafka community, is there a period when voting thread needs to wrap up by? Other projects generally follow 3 or 7 days. Regards, Suresh Sent from phone _ From: Gwen Shapira gshap...@cloudera.commailto:gshap...@cloudera.com Sent: Thursday, April 30, 2015 5:32 PM Subject: Re: [VOTE] KIP-11- Authorization design for kafka security To: dev@kafka.apache.orgmailto:dev@kafka.apache.org On Thu, Apr 30, 2015 at 4:39 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: Hi Joe, Let me clarify on authZException. The caller gets a 403 regardless of existence of the topic, even if the topic does not exist you always get 403. This will fall under the case wherewe do not find any acls for a resource and as per our last decision by default we are going to deny this request. The reason I'm digging into this is that in Hive we had to fix existing behavior after financial customers objected loudly to getting insufficient privileges when a real database would return table does not exist. I completely agree that having to handle two separate error conditions (TopicNotExist if user doesn't have READ, unless user has CREATE in which case he can see all topics and can get Unauthorized) adds complexity and will not be fun to debug. However, when implementing security, a lot of the stuff we do is around making customers pass security audits, and I suspect that can't know that tables even exist test is a thing. We share pretty much the same financial customers and they seem to have the same concerns. Perhaps you can double check if you also have this requirement? (and again, sorry for not seeing this earlier and holding up the vote on what seems like a minor point. I just don't want to punt for later something when we already have an idea of what customers expect) Gwen The configurations are listed explicitly here https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+In terface#KIP-11-AuthorizatiInterface-Changestoexistingclasses under KafkaConfig. We may add an optional config to allow authorizer to read an arbitrary property files incrementally but that does not need to be part of this same KIP. The statement “If we can't audit the access then wht good is controlling the access?” seems extreme because we still get to control the access which IMHO is a huge win. The default authorizer implementation right now logs every allowed/denied access (see here https://github.com/Parth-Brahmbhatt/kafka/blob/KAFKA-1688-impl/core/src/mai n/scala/kafka/security/auth/SimpleAclAthorizer.scala) in debug mode. Anybody who needs auditing could create a lo4j appender to allow debug access to this class and send the log output to some audit fil. Auditing is still a separate piece, we could either add an auditor interface that wraps authorizer or the other way around so authorizer and auditor can be two separate implementation. I woud love to start a new KIP and jira to discuss approaches in more details but I don’t see the need to hold up Authorization work for the same. I don’t agree with the “this design seems too specific” given we already have 3 implementation (default, ranger, sentry) that can be supported with the current design. The authorization happens as part of handle and it is the first action, see here https://github.com/Parth-Brahmbhatt/kafka/blob/KAFKA-1688-impl/core/src/mai n/scala/kafka/server/KafkaApis.scala#L103 for one example. Thanks Parth On 4/30/15, 4:24 PM, Suresh Srinivas sur...@hortonworks.commailto: sur...@hortonworks.com wrote: Joe, thanks for the clarification. Regarding audits, sorry I might be misunderstanding your email. Currently, if Kafka does not support audits, I think audits should be considered as a separate effort. Here are the reasons: - Audit,whether authorization is available or not, should record operations to determine what