Re: Review Request 33065: Patch for KAFKA-1928

2015-05-01 Thread Gwen Shapira


 On May 1, 2015, 9:25 p.m., Jay Kreps wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 382
  https://reviews.apache.org/r/33065/diff/3/?file=946988#file946988line382
 
  Beautiful! So much deleted code!

Haha :) Yeah, I love how minimal SocketServer became.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82281
---


On May 1, 2015, 10:45 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 1, 2015, 10:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 5e3fab13e3c02eb351558ec973b949b3d1196085 
   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
 8b278892883e63899b53e15efb9d8c926131e858 
   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
 d5b306b026e788b4e5479f3419805aa49ae889f3 
   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
 ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
 1c3b3802ac221d570e7610458e50518b4499e7ed 
   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
 a3b1b78adb760eaeb029466b54f335a29caf3b0f 
   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
 fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 b038c15186c0cbcc65b59479324052498361b717 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 75aaf57fb76ec01660d93701a57ae953d877d81c 
   

[jira] [Updated] (KAFKA-2123) Make new consumer offset commit API use callback + future

2015-05-01 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2123:
-
Attachment: KAFKA-2123_2015-05-01_19:33:19.patch

 Make new consumer offset commit API use callback + future
 -

 Key: KAFKA-2123
 URL: https://issues.apache.org/jira/browse/KAFKA-2123
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3

 Attachments: KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, 
 KAFKA-2123_2015-05-01_19:33:19.patch


 The current version of the offset commit API in the new consumer is
 void commit(offsets, commit type)
 where the commit type is either sync or async. This means you need to use 
 sync if you ever want confirmation that the commit succeeded. Some 
 applications will want to use asynchronous offset commit, but be able to tell 
 when the commit completes.
 This is basically the same problem that had to be fixed going from old 
 consumer - new consumer and I'd suggest the same fix using a callback + 
 future combination. The new API would be
 FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback 
 callback);
 where ConsumerCommitCallback contains a single method:
 public void onCompletion(Exception exception);
 We can provide shorthand variants of commit() for eliding the different 
 arguments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33196: Patch for KAFKA-2123

2015-05-01 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33196/
---

(Updated May 2, 2015, 2:33 a.m.)


Review request for kafka.


Bugs: KAFKA-2123
https://issues.apache.org/jira/browse/KAFKA-2123


Repository: kafka


Description (updated)
---

KAFKA-2123: Add queuing of offset commit requests.


KAFKA-2123: Add scheduler for delayed tasks in new consumer, add backoff for 
commit retries, and simplify auto commit by using delayed tasks.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518b732105823058e6182f445248b45dc388 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11df4db0b0084f841a74cbcf819caf780d5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
36aa412404ff1458c7bef0feecaaa8bc45bed9c7 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java
 PRE-CREATION 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
ffbdf5dc106e2a59563768280074696c76491337 

Diff: https://reviews.apache.org/r/33196/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-05-01 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14524185#comment-14524185
 ] 

Steven Zhen Wu commented on KAFKA-2121:
---

Updated reviewboard https://reviews.apache.org/r/33760/diff/
 against branch apache/trunk

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu
 Fix For: 0.8.3

 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, 
 KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, 
 KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, 
 KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, 
 KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, 
 KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch, 
 KAFKA-2121_2015-05-01_15:42:30.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-01 Thread Gwen Shapira


 On April 18, 2015, 12:49 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java, 
  line 55
  https://reviews.apache.org/r/33065/diff/1/?file=922619#file922619line55
 
  Do you know why the return is changed from int to long?

writeTo() and remaining() should agree on type, since remaining is calculated 
from sendSize minus a sum of writeTo() calls.

We can either change writeTo() to int or remaining() to long. 
I believe either will work, and I chose to change remaining().


 On April 18, 2015, 12:49 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/api/FetchResponse.scala, lines 66-72
  https://reviews.apache.org/r/33065/diff/1/?file=922627#file922627line66
 
  I am wondering if we need both completed() and remaining() in Send. It 
  seems that one of the two is enough for our usage.
  
  Also, not sure how useful reify() is. Currently, it's not used anywhere.

reification is a very central concept in Kafka's The Trial :)

I agree that reify() and remaining() can be removed without any pain. Since it 
is a public API, do we need to deprecate them first?


 On April 18, 2015, 12:49 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/api/FetchResponse.scala, line 70
  https://reviews.apache.org/r/33065/diff/1/?file=922627#file922627line70
 
  This probably should be def reify() {}?

errr... why? is it a coding standard issue? I think the above is correct code 
for reify as a function that returns null (which is what Send API specified if 
we don't have a buffer to return). Anyway, this is irrelevant if we are 
removing reify.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review80557
---


On May 1, 2015, 12:48 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 1, 2015, 12:48 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   

[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-05-01 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated KAFKA-2121:
--
Attachment: KAFKA-2121_2015-05-01_15:42:30.patch

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu
 Fix For: 0.8.3

 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, 
 KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, 
 KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, 
 KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, 
 KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, 
 KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch, 
 KAFKA-2121_2015-05-01_15:42:30.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
  

Re: Review Request 33760: Patch for KAFKA-2121

2015-05-01 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33760/#review82292
---

Ship it!


Ship It!

- Guozhang Wang


On May 1, 2015, 10:42 p.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33760/
 ---
 
 (Updated May 1, 2015, 10:42 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 override java.io.Closeable$close method in Serializer and Deserializer 
 interfaces without throwing checked IOException. this is to avoid breaking 
 the source compatability.
 
 
 add a test for checking Serializer is closed during KafkaProducer#close
 
 
 missing copyright header in previous checkin
 
 
 remvoed throws Exception for test methods
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
 9a57579f87cb19cb6affe6d157ff8446c23e3551 
   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
 c44054038066f0d0829d05f082b2ee42b34cded7 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  eea2c28450736d1668c68828f77a49470a82c3d0 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  49f1427bcbe43c773920a25aa69a71d0329296b7 
   clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
 6f948f240c906029a0f972bf770f288f390ea714 
   clients/src/test/java/org/apache/kafka/test/MockSerializer.java 
 PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33760/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




Re: Review Request 33760: Patch for KAFKA-2121

2015-05-01 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33760/
---

(Updated May 1, 2015, 10:42 p.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

override java.io.Closeable$close method in Serializer and Deserializer 
interfaces without throwing checked IOException. this is to avoid breaking the 
source compatability.


add a test for checking Serializer is closed during KafkaProducer#close


missing copyright header in previous checkin


remvoed throws Exception for test methods


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
9a57579f87cb19cb6affe6d157ff8446c23e3551 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c44054038066f0d0829d05f082b2ee42b34cded7 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
eea2c28450736d1668c68828f77a49470a82c3d0 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
49f1427bcbe43c773920a25aa69a71d0329296b7 
  clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
6f948f240c906029a0f972bf770f288f390ea714 
  clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION 

Diff: https://reviews.apache.org/r/33760/diff/


Testing
---


Thanks,

Steven Wu



[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-05-01 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1928:

Attachment: KAFKA-1928_2015-05-01_15:45:24.patch

 Move kafka.network over to using the network classes in 
 org.apache.kafka.common.network
 ---

 Key: KAFKA-1928
 URL: https://issues.apache.org/jira/browse/KAFKA-1928
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Gwen Shapira
 Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch, 
 KAFKA-1928_2015-04-30_17:48:33.patch, KAFKA-1928_2015-05-01_15:45:24.patch


 As part of the common package we introduced a bunch of network related code 
 and abstractions.
 We should look into replacing a lot of what is in kafka.network with this 
 code. Duplicate classes include things like Receive, Send, etc. It is likely 
 possible to also refactor the SocketServer to make use of Selector which 
 should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-05-01 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14524195#comment-14524195
 ] 

Gwen Shapira commented on KAFKA-1928:
-

Updated reviewboard https://reviews.apache.org/r/33065/diff/
 against branch trunk

 Move kafka.network over to using the network classes in 
 org.apache.kafka.common.network
 ---

 Key: KAFKA-1928
 URL: https://issues.apache.org/jira/browse/KAFKA-1928
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Gwen Shapira
 Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch, 
 KAFKA-1928_2015-04-30_17:48:33.patch, KAFKA-1928_2015-05-01_15:45:24.patch


 As part of the common package we introduced a bunch of network related code 
 and abstractions.
 We should look into replacing a lot of what is in kafka.network with this 
 code. Duplicate classes include things like Receive, Send, etc. It is likely 
 possible to also refactor the SocketServer to make use of Selector which 
 should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33065: Patch for KAFKA-1928

2015-05-01 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 1, 2015, 10:45 p.m.)


Review request for kafka.


Bugs: KAFKA-1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11df4db0b0084f841a74cbcf819caf780d5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java 
ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
1c3b3802ac221d570e7610458e50518b4499e7ed 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
a3b1b78adb760eaeb029466b54f335a29caf3b0f 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
  core/src/main/scala/kafka/api/FetchRequest.scala 
b038c15186c0cbcc65b59479324052498361b717 
  core/src/main/scala/kafka/api/FetchResponse.scala 
75aaf57fb76ec01660d93701a57ae953d877d81c 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
431190ab94afc4acfc14348a1fc720e17c071cea 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
cf8e6acc426aef6eb19d862bf6a108a5fc37907a 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
67811a752a470bf9bdbc8c5419e8d6e20a006169 
  core/src/main/scala/kafka/api/OffsetRequest.scala 
3d483bc7518ad76f9548772522751afb4d046b78 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
570b2da1d865086f9830aa919a49063abbbe574d 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 
5e14987c990fe561c01dac2909f5ed21a506e038 
  core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
363bae01752318f3849242b97a6619747697c1d9 
  

[jira] [Commented] (KAFKA-2123) Make new consumer offset commit API use callback + future

2015-05-01 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14524491#comment-14524491
 ] 

Ewen Cheslack-Postava commented on KAFKA-2123:
--

Updated to add backoff back in upon retries, which was removed when I unified 
the sync and async processing. I ended up generalizing this a bit to provide 
for scheduling tasks that need to be delayed for some time. With the 
generalization, I could also move the automatic background offset commits to 
this system as well. This also fixed a bug in the existing implementation where 
a long poll might cause automatic offset commit to not run since we only 
checked before the poll if a commit was needed but didn't account for the need 
to stop polling and commit offsets if the commit should happen sometime during 
the poll() period.

[~jkreps] and [~junrao] might want to take a look -- the producer code has 
gotten complicated because we have so many timeouts to coordinate, so computing 
the right poll timeouts has to take into account a bunch of variables. I think 
a centralized scheduler might help keep this complexity in check.

 Make new consumer offset commit API use callback + future
 -

 Key: KAFKA-2123
 URL: https://issues.apache.org/jira/browse/KAFKA-2123
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3

 Attachments: KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, 
 KAFKA-2123_2015-05-01_19:33:19.patch


 The current version of the offset commit API in the new consumer is
 void commit(offsets, commit type)
 where the commit type is either sync or async. This means you need to use 
 sync if you ever want confirmation that the commit succeeded. Some 
 applications will want to use asynchronous offset commit, but be able to tell 
 when the commit completes.
 This is basically the same problem that had to be fixed going from old 
 consumer - new consumer and I'd suggest the same fix using a callback + 
 future combination. The new API would be
 FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback 
 callback);
 where ConsumerCommitCallback contains a single method:
 public void onCompletion(Exception exception);
 We can provide shorthand variants of commit() for eliding the different 
 arguments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: Patch for KAFKA-2055: ConsumerBounceTest.testS...

2015-05-01 Thread lvfangmin
GitHub user lvfangmin opened a pull request:

https://github.com/apache/kafka/pull/60

Patch for KAFKA-2055: ConsumerBounceTest.testSeekAndCommitWithBrokerFail...

...ures transient failure

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lvfangmin/kafka KAFKA-2055

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/60.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #60


commit 5e174e3feabe03c14a6dca451fdf0967285069ff
Author: lvfangmin lvfang...@gmail.com
Date:   2015-04-27T14:40:35Z

Patch for KAFKA-2055: 
ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2055) ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure

2015-05-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14523248#comment-14523248
 ] 

ASF GitHub Bot commented on KAFKA-2055:
---

GitHub user lvfangmin opened a pull request:

https://github.com/apache/kafka/pull/60

Patch for KAFKA-2055: ConsumerBounceTest.testSeekAndCommitWithBrokerFail...

...ures transient failure

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lvfangmin/kafka KAFKA-2055

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/60.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #60


commit 5e174e3feabe03c14a6dca451fdf0967285069ff
Author: lvfangmin lvfang...@gmail.com
Date:   2015-04-27T14:40:35Z

Patch for KAFKA-2055: 
ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure




 ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure
 

 Key: KAFKA-2055
 URL: https://issues.apache.org/jira/browse/KAFKA-2055
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Fangmin Lv
  Labels: newbie
 Attachments: KAFKA-2055.patch


 {code}
 kafka.api.ConsumerBounceTest  testSeekAndCommitWithBrokerFailures FAILED
 java.lang.AssertionError: expected:1000 but was:976
 at org.junit.Assert.fail(Assert.java:92)
 at org.junit.Assert.failNotEquals(Assert.java:689)
 at org.junit.Assert.assertEquals(Assert.java:127)
 at org.junit.Assert.assertEquals(Assert.java:514)
 at org.junit.Assert.assertEquals(Assert.java:498)
 at 
 kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117)
 at 
 kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98)
 kafka.api.ConsumerBounceTest  testSeekAndCommitWithBrokerFailures FAILED
 java.lang.AssertionError: expected:1000 but was:913
 at org.junit.Assert.fail(Assert.java:92)
 at org.junit.Assert.failNotEquals(Assert.java:689)
 at org.junit.Assert.assertEquals(Assert.java:127)
 at org.junit.Assert.assertEquals(Assert.java:514)
 at org.junit.Assert.assertEquals(Assert.java:498)
 at 
 kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117)
 at 
 kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review

2015-05-01 Thread Parth Brahmbhatt
+1. 

Thanks
Parth

On 5/1/15, 12:38 AM, Ewen Cheslack-Postava e...@confluent.io wrote:

Also +1. There are some drawbacks to using Github for reviews, e.g. lots
of
emails for each review because they don't let you publish your entire
review in one go like RB does, but it drastically lowers the barrier to
contributing for most developers. Also, if you haven't tried it, hub
https://hub.github.com/ makes it really easy to checkout and test PRs.

One thing I noticed is that when you try to generate a PR it defaults to
the 0.8.2 branch. Can we fix that up to be trunk by default? That's the
most common use case; version branches are really only useful when a
release is being prepared. Do changes to the Github repo require tickets
to
the Apache Infra team or is this something committers have control over?


On a related note, which perhaps should be discussed on another thread:

The CI setup is a related issue that we might want to rethink. Apache also
supports Travis CI, and now pays for dedicated build slaves:
https://blogs.apache.org/infra/entry/apache_gains_additional_travis_ci I
think the setup should be pretty easy since building + running tests is
just a gradle command. Having tests run automatically on PRs (and
promptly!) makes it a lot easier to confidently commit a change,
especially
if the build merges with trunk first. I started looking at this when I was
trying to sort out Confluent's build  test infrastructure. See
https://travis-ci.org/ewencp/kafka/builds/60802386 for an example, the
patch is about 10 lines in a .travis.yml file and the failure in that
example seems to be unrelated to the Travis confg. I think the basic
config
I created is all we'd need.

Unfortunately, I couldn't easily tell what the delay on builds is, i.e.
would it be an improvement over the delays with the current Jenkins setup.
But having them run on PR creation/update means the results will usually
be
ready by the time a reviewer gets to looking at the PR and would be
reported in the PR so the state is easy to evaluate. (I'm also having
trouble telling exactly how the two ASF Jenkins builders differ since they
both seem to poll trunk, so I can't be certain whether Travis would be
able
to completely replace the current setup. That said, it should be telling
that I have never paid attention to Jenkins output at all since it seems
so
far removed from any of my actions as a contributor.)

As another alternative, Confluent would also be happy to provide
build/test
infrastructure to help out. AMPLab does this for Spark, so it seems to be
acceptable to ASF. We already have PR builders and trunk/master builders
set up for other projects, so it wouldn't be hard to setup the same for
Kafka with the right access to the repos. Based on the current frequency
of
builds (https://builds.apache.org/view/All/job/Kafka-trunk/ and
https://builds.apache.org/view/All/job/KafkaPreCommit/) I think it'll be
easy for even our current infrastructure to keep up.


On Thu, Apr 30, 2015 at 9:41 PM, Jaikiran Pai jai.forums2...@gmail.com
wrote:

 I think this will help a lot in contributions. Some of my local changes
 that I want to contribute back have been pending because I sometimes
switch
 machines and I then have to go through setting up the Ruby/python and
other
 stuff for the current review process. Using just github is going to
help in
 quickly submitting the changes.

 -Jaikiran

 On Thursday 30 April 2015 06:42 PM, Ismael Juma wrote:

 Hi all,

 Kafka currently uses a combination of Review Board and JIRA for
 contributions and code review. In my opinion, this makes contribution
and
 code review a bit harder than it has to be.

 I think the approach used by Spark would improve the current situation:

 Generally, Spark uses JIRA to track logical issues, including bugs and
 improvements, and uses Github pull requests to manage the review and
merge
 of specific code changes. That is, JIRAs are used to describe what
should
 be fixed or changed, and high-level approaches, and pull requests
describe
 how to implement that change in the project's source code. For example,
 major design decisions are discussed in JIRA.[1]

 It's worth reading the wiki page for all the details, but I will
summarise
 the suggested workflow for code changes:

 1. Fork the Github repository at http://github.com/apache/kafka (if
 you
 haven't already)
 2. git checkout -b kafka-XXX
 3. Make one or more commits (smaller commits can be easier to
review
 and
 reviewboard makes that hard)
 4. git push origin kafka-XXX
 5. Create PR against upstream/trunk (this will update JIRA
 automatically[2] and it will send an email to the dev mailing list
 too)
 6. A CI build will be triggered[3]
 7. Review process happens on GitHub (it's quite handy to be able to
 comment on both commit or PR-level, unlike Review Board)
 8. Once all feedback has been addressed and the build is green, a
 variant of the `merge_spark_pr.py`[4] script is used 

Review Request 33760: Patch for KAFKA-2121

2015-05-01 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33760/
---

Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description
---

override java.io.Closeable$close method in Serializer and Deserializer 
interfaces without throwing checked IOException. this is to avoid breaking the 
source compatability.


add a test for checking Serializer is closed during KafkaProducer#close


Diffs
-

  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
9a57579f87cb19cb6affe6d157ff8446c23e3551 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c44054038066f0d0829d05f082b2ee42b34cded7 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
eea2c28450736d1668c68828f77a49470a82c3d0 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
49f1427bcbe43c773920a25aa69a71d0329296b7 
  clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
6f948f240c906029a0f972bf770f288f390ea714 
  clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION 

Diff: https://reviews.apache.org/r/33760/diff/


Testing
---


Thanks,

Steven Wu



RE: [DISCUSS] KIP-21 Configuration Management

2015-05-01 Thread Aditya Auradkar
Hey Gwen,

Thanks for the feedback. As Joel said, these client configs do not introduce a 
producer/consumer zk dependency. It is configuration that is needed by the 
broker.

From your comments, I gather that you are more worried about managing broker 
internal configs via Zookeeper since we already have a file. So why have two 
mechanisms? Given that we already manage topic specific configuration in ZK, 
it seems a good fit to at least have client configs there since these config 
parameters aren't really driven through a file anyway. It also maintains 
consistency. 

Even for broker configs, it seems very consistent to have all the overridden 
configs in one place which is easy to view and change. As you mentioned user's 
shouldn't ever have to fiddle with Zookeeper directly, our tooling should 
provide the ability to view and modify configs on a per-broker basis. I do like 
your suggestion of reloading config files but I'm not sure this works easily 
for everyone. For example, often these per-host overrides in config files are 
managed by hostname but what we really want are broker level overrides which 
means that it should ideally be tied to a broker-id which is a Kafka detail. In 
addition, sometimes these configs pushed to individual hosts aren't the 
properties files themselves.. rather some company specific stuff that also 
contains the Kafka configs. I guess the point I'm trying to make is that people 
may not be able to reload configs directly from file without doing some 
additional work in many cases.

As far as propogating configuration changes, perhaps I can clarify this section 
a bit more. Also, we can also do a pass over all the configs in KafkaConfig and 
have a list of properties that can be converted slowly.

Thanks,
Aditya


From: Joel Koshy [jjkosh...@gmail.com]
Sent: Thursday, April 30, 2015 5:14 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-21 Configuration Management

1. I have deep concerns about managing configuration in ZooKeeper.
First, Producers and Consumers shouldn't depend on ZK at all, this seems
to add back a dependency we are trying to get away from.

The KIP probably needs to be clarified here - I don't think Aditya was
referring to client (producer/consumer) configs. These are global
client-id-specific configs that need to be managed centrally.
(Specifically, quota overrides on a per-client basis).



[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-05-01 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated KAFKA-2121:
--
Attachment: KAFKA-2121.patch

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu
 Fix For: 0.8.3

 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, 
 KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, 
 KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, 
 KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, 
 KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, 
 KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we 

[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-05-01 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14523538#comment-14523538
 ] 

Steven Zhen Wu commented on KAFKA-2121:
---

Created reviewboard https://reviews.apache.org/r/33760/diff/
 against branch apache/trunk

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu
 Fix For: 0.8.3

 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, 
 KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, 
 KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, 
 KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, 
 KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, 
 KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-01 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82281
---


I did a high level pass. On the whole I think this is awesome! I agree we need 
to rationalize the metrics now that we have a bunch of instrumentation at the 
selector level we need a lot less at the socket server level.


clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133003

Does this need to be public? Instead could we have maxIdleTime be a 
configurable param and have this call happen from within select?

We didn't have this functionality in the client before, but actually the 
client also needs connection LRU so I think doing it on both sides is good.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133004

Beautiful! So much deleted code!


- Jay Kreps


On May 1, 2015, 12:48 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 1, 2015, 12:48 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 5e3fab13e3c02eb351558ec973b949b3d1196085 
   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
 8b278892883e63899b53e15efb9d8c926131e858 
   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
 d5b306b026e788b4e5479f3419805aa49ae889f3 
   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
 ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
 1c3b3802ac221d570e7610458e50518b4499e7ed 
   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
 a3b1b78adb760eaeb029466b54f335a29caf3b0f 
   

Re: Review Request 33760: Patch for KAFKA-2121

2015-05-01 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33760/#review82280
---


Pointed out two minor issues, but otherwise LGTM.


clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
https://reviews.apache.org/r/33760/#comment133001

Both of these test methods no longer need 'throws Exception'.



clients/src/test/java/org/apache/kafka/test/MockSerializer.java
https://reviews.apache.org/r/33760/#comment133000

Checkstyle is complaining about missing the copyright header.


- Ewen Cheslack-Postava


On May 1, 2015, 6 p.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33760/
 ---
 
 (Updated May 1, 2015, 6 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 override java.io.Closeable$close method in Serializer and Deserializer 
 interfaces without throwing checked IOException. this is to avoid breaking 
 the source compatability.
 
 
 add a test for checking Serializer is closed during KafkaProducer#close
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
 9a57579f87cb19cb6affe6d157ff8446c23e3551 
   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
 c44054038066f0d0829d05f082b2ee42b34cded7 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  eea2c28450736d1668c68828f77a49470a82c3d0 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  49f1427bcbe43c773920a25aa69a71d0329296b7 
   clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
 6f948f240c906029a0f972bf770f288f390ea714 
   clients/src/test/java/org/apache/kafka/test/MockSerializer.java 
 PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33760/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




Re: [DISCUSS] KIP-21 Configuration Management

2015-05-01 Thread Joe Stein
Hi Aditya, thanks for the write up and focusing on this piece.

Agreed we need something that we can do broker changes dynamically without
rolling restarts.

I think though if every broker is getting changes it with notifications it
is going to limit which configs can be dynamic.

We could never deliver a stop the world configuration change because then
that would happen on the entire cluster to every broker on the same time.

Can maybe just the controller get the notification?

And we provide a layer for brokers to work with the controller to-do the
config change operations at is discretion (so it can stop things if needs).

controller gets notification, sends AdminChangeNotification to broker [X ..
N] then brokers can do their things, even send a response for heartbeating
while it takes the few milliseconds it needs or crashes. We need to go
through both scenarios.

I am worried we put this change in like this and it works for quotas and
maybe a few other things but nothing else gets dynamic and we don't get far
enough for almost no more rolling restarts.

~ Joe Stein
- - - - - - - - - - - - - - - - -

  http://www.stealth.ly
- - - - - - - - - - - - - - - - -

On Thu, Apr 30, 2015 at 8:14 PM, Joel Koshy jjkosh...@gmail.com wrote:

 1. I have deep concerns about managing configuration in ZooKeeper.
 First, Producers and Consumers shouldn't depend on ZK at all, this
 seems
 to add back a dependency we are trying to get away from.

 The KIP probably needs to be clarified here - I don't think Aditya was
 referring to client (producer/consumer) configs. These are global
 client-id-specific configs that need to be managed centrally.
 (Specifically, quota overrides on a per-client basis).




Re: [DISCUSSION] java.io.Closeable in KAFKA-2121

2015-05-01 Thread Jun Rao
If you use KafkaProducer as a Closable, you still need to catch the
exception when calling close(), right? So the behavior is different whether
you use it as a Producer or a Closable?

Thanks,

Jun

On Thu, Apr 30, 2015 at 6:26 PM, Jay Kreps j...@confluent.io wrote:

 Hey Jun,

 I think the Closable interface is what we've used elsewhere and what the
 rest of the java world uses. I don't think it is too hard for us to add the
 override in our interface--implementors of the interface don't need to do
 it.

 -Jay

 On Thu, Apr 30, 2015 at 4:02 PM, Jun Rao j...@confluent.io wrote:

  That makes sense. Then, would it be better to have a KafkaClosable
  interface that doesn't throw exception? This way, we don't need to
 override
  close in every implementing class.
 
  Thanks,
 
  Jun
 
  On Wed, Apr 29, 2015 at 10:36 AM, Steven Wu stevenz...@gmail.com
 wrote:
 
   Jun,
  
   we still get the benefit of extending Closeable. e.g.
  Utils.closeQuietly()
   can take FooSerializer as an argument. we can avoid the duplication of
   boiler-plate code.
  
   class FooSerializer implements Serializer {
  
   @Override
   public void close() {
   // may throw unchecked RuntimeException
   }
   }
  
   final class Utils {
   public static void closeQuietly(Closeable c, String name,
   AtomicReferenceThrowable firstException) {
   if (c != null) {
   try {
   c.close();
   } catch (Throwable t) {
   firstException.compareAndSet(null, t);
   log.error(Failed to close  + name, t);
   }
   }
   }
  
   On Wed, Apr 29, 2015 at 6:51 AM, Jun Rao j...@confluent.io wrote:
  
If you do this, the code is no longer simple, which defeats the
 benefit
   of
extending Closeable. We can define our own Closeable that doesn't
 throw
exceptions, but it may be confusing. So, it seems the original code
 is
probably better.
   
Thanks,
   
Jun
   
On Tue, Apr 28, 2015 at 3:11 PM, Steven Wu stevenz...@gmail.com
  wrote:
   
 sorry for the previous empty msg.

 Jay's idea should work. basically, we override the close method in
 Serializer interface.

 public interface SerializerT extends Closeable {
 @Override
 public void close();
 }

 On Tue, Apr 28, 2015 at 1:10 PM, Steven Wu stevenz...@gmail.com
   wrote:

 
 
  On Tue, Apr 28, 2015 at 1:03 PM, Ewen Cheslack-Postava 
 e...@confluent.io
  wrote:
 
  Good point Jay. More specifically we were already implementing
   without
 the
  checked exception, we'd need to override close() in the
 Serializer
   and
  Deserializer interfaces and omit the throws clause. That
  definitely
 makes
  them source compatible. Not sure about binary compatibility, I
couldn't
  find a quick answer but I think it's probably still compatible.
 
  -Ewen
 
  On Tue, Apr 28, 2015 at 12:30 PM, Jay Kreps 
 jay.kr...@gmail.com
 wrote:
 
   Hey guys,
  
   You can implement Closable without the checked exception.
 Having
 close()
   methods throw checked exceptions isn't very useful unless
 there
   is a
 way
   for the caller to recover. In this case there really isn't,
  right?
  
   -Jay
  
   On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang 
   wangg...@gmail.com
  wrote:
  
Folks,
   
In a recent commit I made regarding KAFKA-2121, there is an
omitted
  API
change which makes Serializer / Deserializer extending from
 Closeable,
whose close() call could throw IOException by declaration.
  Hence
now
  some
scenario like:
   
-
   
SerializerT keySerializer = ...
SerializerT valueSerializer = ...
KafkaProducer producer = new KafkaProducer(config,
   keySerializer,
valueSerializer)
// ...
keySerializer.close()
valueSerializer.close()
   
-
   
will need to capture IOException now.
   
Want to bring this up for people's attention, and you
 opinion
  on
  whether
   we
should revert this change?
   
-- Guozhang
   
  
 
 
 
  --
  Thanks,
  Ewen
 
 
 

   
  
 



Re: [VOTE] KIP-11- Authorization design for kafka security

2015-05-01 Thread Jun Rao
Suresh,

We typically wrap up the voting of a KIP in a few days. However, given that
this KIP is quite critical and there seems to be new questions, perhaps we
can spend a bit more time to have people's concerns addressed and then
resume the voting.

Joe,

Do you still have concerns given the previous replies?

Thanks,

Jun


On Thu, Apr 30, 2015 at 7:54 PM, Suresh Srinivas sur...@hortonworks.com
wrote:

 It is a strange choice to return does not exist when the condition is
 actually not authorized. I have hard time understanding why that is
 better for security. Perhaps in DB world this is expected and changes may
 be necessary to comply with such behavior. But that should not guide what
 we do in Kafka.

 This is a voting thread for an important feature. Security is the number
 one feature that our users are asking for. Can't minor things like this be
 done in a follow up jiras? Should the focus be brought back to voting?

 Btw since I am new to the Kafka community, is there a period when voting
 thread needs to wrap up by? Other projects generally follow 3 or 7 days.

 Regards,
 Suresh

 Sent from phone

 _
 From: Gwen Shapira gshap...@cloudera.commailto:gshap...@cloudera.com
 Sent: Thursday, April 30, 2015 5:32 PM
 Subject: Re: [VOTE] KIP-11- Authorization design for kafka security
 To: dev@kafka.apache.orgmailto:dev@kafka.apache.org


 On Thu, Apr 30, 2015 at 4:39 PM, Parth Brahmbhatt 
 pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote:

  Hi Joe,
 
  Let me clarify on authZException. The caller gets a 403 regardless of
  existence of the topic, even if the topic does not exist you always get
  403. This will fall under the case wherewe do not find any acls for a
  resource and as per our last decision by default we are going to deny
 this
  request.
 

 The reason I'm digging into this is that in Hive we had to fix existing
 behavior after financial customers objected loudly to getting insufficient
 privileges when a real database would return table does not exist.

 I completely agree that having to handle two separate error conditions
 (TopicNotExist if user doesn't have READ, unless user has CREATE in which
 case he can see all topics and can get Unauthorized) adds complexity and
 will not be fun to debug. However, when implementing security, a lot of the
 stuff we do is around making customers pass security audits, and I suspect
 that can't know that tables even exist test is a thing.

 We share pretty much the same financial customers and they seem to have the
 same concerns. Perhaps you can double check if you also have this
 requirement?

 (and again, sorry for not seeing this earlier and holding up the vote on
 what seems like a minor point. I just don't want to punt for later
 something when we already have an idea of what customers expect)

 Gwen



 
  The configurations are listed explicitly here
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+In
  terface#KIP-11-AuthorizatiInterface-Changestoexistingclasses under
  KafkaConfig. We may add an optional config to allow authorizer to read an
  arbitrary property files incrementally but that does not need to be part
  of this same KIP.
 
  The statement “If we can't audit the access then wht good is controlling
  the access?” seems extreme because we still get to control the access
  which IMHO is a huge win. The default authorizer implementation right now
  logs every allowed/denied access (see here
 
 https://github.com/Parth-Brahmbhatt/kafka/blob/KAFKA-1688-impl/core/src/mai
  n/scala/kafka/security/auth/SimpleAclAthorizer.scala) in debug mode.
  Anybody who needs auditing could create a lo4j appender to allow debug
  access to this class and send the log output to some audit fil.
 
  Auditing is still a separate piece, we could either add an auditor
  interface that wraps authorizer or the other way around so authorizer and
  auditor can be two separate implementation. I woud love to start a new
  KIP and jira to discuss approaches in more details but I don’t see the
  need to hold up Authorization work for the same.
 
  I don’t agree with the “this design seems too specific” given we already
  have 3 implementation (default, ranger, sentry) that can be supported
 with
  the current design.
 
  The authorization happens as part of handle and it is the first action,
  see here
 
 https://github.com/Parth-Brahmbhatt/kafka/blob/KAFKA-1688-impl/core/src/mai
  n/scala/kafka/server/KafkaApis.scala#L103 for one example.
 
  Thanks
  Parth
 
 
 
  On 4/30/15, 4:24 PM, Suresh Srinivas sur...@hortonworks.commailto:
 sur...@hortonworks.com wrote:
 
  Joe, thanks for the clarification.
  
  Regarding audits, sorry I might be misunderstanding your email.
  Currently, if Kafka does not support audits, I think audits should be
  considered as a separate effort. Here are the reasons:
  - Audit,whether authorization is available or not, should record
  operations to determine what