Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92599
---



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 46 - 57)
https://reviews.apache.org/r/33620/#comment146806

Some of these fields can be `final`.


- Ismael Juma


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated July 20, 2015, 7 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Merge commit
 
 
 KAFKA-1690. Added SSL Consumer Test.
 
 
 KAFKA-1690. SSL Support.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Diffs
 -
 
   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 bae528d31516679bed88ee61b408f209f185a8cc 
   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
 PRE-CREATION 
   

Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Sriharsha Chintalapani


 On July 22, 2015, 3 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java,
   lines 231-234
  https://reviews.apache.org/r/33620/diff/12/?file=1017027#file1017027line231
 
  Still some questions on this.
  
  1. When handshakeStatus is not NEED_UNWRAP (e.g. FINISHED), we could 
  have flushed all the bytes. In this case, we should turn off the write 
  interestOps in the socket key, right?
  2. When handshakeStatus is NEED_UNWRAP and write is true, we will move 
  on to the NEED_UNWRAP case. However, in this case, there may still be 
  unflushed bytes in netWriteBuffer.
  3. When handshakeStatus transitions to FINISHED, we return to the 
  callers. Doesn't that delay the completion of the handshake since this key 
  may no longer be selected?

1. handshakeStatus cannot go from NEED_WRAP to FINISHED it can only go to 
NEED_UNWRAP. Either client or server needs to get into NEED_UNWRAP before it 
can reach to FINISHED status.
2. This is not true. If the handshake status need_unwrap and we are checking if 
flush returns true if its not than that means there are unflushed bytes 
netWriteBuffer. we set the interestOps to WRITE and break.
3. I am not sure I follow. If the handshake is finished the read bit is still 
on. So either client needs to start sending some data , at that point they set 
write bit on. Example metadata update request or server starts to respond to 
the client with responses. Handshake happens at the connection in reality 
client is done with handshake and starts sending some requests either its a 
producer or consumer.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92314
---


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated July 20, 2015, 7 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Merge commit
 
 
 KAFKA-1690. Added SSL Consumer Test.
 
 
 KAFKA-1690. SSL Support.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Diffs
 -
 
   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   

[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637126#comment-14637126
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~becket_qin] Cool, we're on the same page, that was how I interpreted what you 
said. There is definitely a sense in which this is more elegant but there is a 
little complexity since you need to keep a list of surprised partitions and 
need to populate that when unsubscribe(partition) is called if that topic is 
subscribed to.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Sriharsha Chintalapani


 On July 22, 2015, 3 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java,
   lines 361-374
  https://reviews.apache.org/r/33620/diff/12/?file=1017027#file1017027line361
 
  Hmm, if status is ok and handshakeStatus is NEED_UNWRAP, could we get 
  into infinite loop here?

Sorry. I thought I mvoed the socketChannel read inside the loop. I missed it.


 On July 22, 2015, 3 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java,
   lines 394-397
  https://reviews.apache.org/r/33620/diff/12/?file=1017027#file1017027line394
 
  We talked about that before. It's possible that we read all bytes for 
  multiple recieves into the appReadBuffer. Selector only reads off one 
  receive at a time. If there are no more bytes from incoming network, this 
  key may not be selected to read off the next receive in the appReadBuffer. 
  How do we resolve that issue?

yes. This is an issue which I am trying to address by adding the stagedReceives 
on Selector side which is to read as many availbale reads as possible in a 
single poll and push these into stagedReceives queue and pop one out during 
that poll add to completedReceives. In other polls if the selectionKey is not 
selected and its not mute and if there is a stagedReceive waiting we will poll 
that and add to completedReceives. But this logic is not working on serverside 
as I get selector.send IllegalStateException. I am still looking into it will 
update the patch once I've a solution. Please do let me know if the above 
description sounds right to you.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92314
---


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated July 20, 2015, 7 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Merge commit
 
 
 KAFKA-1690. Added SSL Consumer Test.
 
 
 KAFKA-1690. SSL Support.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Diffs
 -
 
   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   

Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92604
---


Hey Ashish, this looks pretty good to me. Just some minor comments.


clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 298)
https://reviews.apache.org/r/36681/#comment146810

Any reason not to put this method in Fetcher instead of here? I don't have 
a strong feeling, but it was kind of nice keeping ConsumerNetworkClient largely 
free of application logic.

Also, it might be nice to have a unit test.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (lines 314 - 315)
https://reviews.apache.org/r/36681/#comment146811

I think I asked this before, but is there any harm in returning this topic 
to the user? I ask because we don't actually prevent them from calling 
partitionsFor with the same topic.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 320)
https://reviews.apache.org/r/36681/#comment146812

I think convention is to leave off the braces on one-line if statements.


- Jason Gustafson


On July 22, 2015, 6:32 a.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36681/
 ---
 
 (Updated July 22, 2015, 6:32 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2275
 https://issues.apache.org/jira/browse/KAFKA-2275
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2275: Add a ListTopics() API to the new consumer
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
 ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 c14eed1e95f2e682a235159a366046f00d1d90d6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
 
 Diff: https://reviews.apache.org/r/36681/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ashish Singh
 




Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92405
---



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java (line 
61)
https://reviews.apache.org/r/33620/#comment146575

Is there a JIRA ticket for removing `BlockingChannel`? If so, it may be 
worth mentioning the id here so that it's easy to find when the time comes for 
the removal to be done.



clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java (line 199)
https://reviews.apache.org/r/33620/#comment146804

You can use try with resources now that we require Java 7.


- Ismael Juma


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated July 20, 2015, 7 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Merge commit
 
 
 KAFKA-1690. Added SSL Consumer Test.
 
 
 KAFKA-1690. SSL Support.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Diffs
 -
 
   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 bae528d31516679bed88ee61b408f209f185a8cc 
   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
  PRE-CREATION 
   
 

Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Sriharsha Chintalapani


 On July 22, 2015, 3 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java,
   line 411
  https://reviews.apache.org/r/33620/diff/12/?file=1017027#file1017027line411
 
  Is renegotiation actually supported? It seems that renegotiation can 
  happen in the middle of a regular send/receive.

Yes it can happen during the middle of regular send/receive. But this need to 
be initiated by server which we don't have any criteria i.e we are checking any 
certificate expiration to trigger renegotiation. As we discussed we are going 
to skip this for the initial version.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92314
---


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated July 20, 2015, 7 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Merge commit
 
 
 KAFKA-1690. Added SSL Consumer Test.
 
 
 KAFKA-1690. SSL Support.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Diffs
 -
 
   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 bae528d31516679bed88ee61b408f209f185a8cc 
   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
  PRE-CREATION 
   
 

Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Guozhang Wang


 On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 400
  https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400
 
  So in case of unexpected exception, we log an error and keep running?
  
  Isn't it better to kill the processor, since we don't know what's the 
  state of the system? If the acceptor keeps placing messages in the queue 
  for a dead processor, isn't it a separate issue?
 
 Jiangjie Qin wrote:
 This part I'm not quite sure. I am not very experienced in the error 
 handling in such case, so please correct me if I missed something.
 Here is what I thought.
 
 The way it currently works is that the acceptor will 
 1. accept new connection request and create new socket channel
 2. choose a processor and put the socket channel into the processor's new 
 connection queue
 
 The processor will just take the socket channels from the queue and 
 register it to the selector.
 If the processor runs and get an uncaught exception, there are several 
 possibilities. 
 Case 1: The exception was from one socket channel. 
 Case 2: The exception was associated with a bad request. 
 In case 1, ideally we should just disconnect that socket channel without 
 affecting other socket channels.
 In case 2, I think we should log the error and skip the message - 
 assuming client will retry sending data if no response was received for a 
 given peoriod of time.
 
 I am not sure if letting processor exit is a good idea because this will 
 lead to the result of a badly behaving client screw the entire cluster - it 
 might screw processors one by one. Comparing with that, I kind of leaning 
 towards keeping the processor running and serving other normal TCP 
 connections if possible, but log the error so monitoring system can detect 
 and see if human intervention is needed.
 
 Also, I don't know what to do here to prevent the thread from exiting 
 without catching all the throwables.
 According to this blog 
 http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
 I guess I can rethrow all the ControlThrowables, but intercept the rests?

I would also prefer not to close the Processor thread upon exceptions, mainly 
for avoid one bad client killing a shared Kafka cluster. In the past we have 
seen such issues like DDoS MetadataRequest killing the cluster and all other 
clients gets affected, etc, and the quota work is towards preventing it. Since 
Processor threads are shared (8 by default on a broker), it should not be 
closed by a single socket / bad client request.


 On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 465
  https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465
 
  Turns out that catching Throwable is a really bad idea: 
  https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
 
 Jiangjie Qin wrote:
 Ah... Didn't know that before. I explicitly listed the exceptions.

Searching : Throwable gives me 180+ cases in code base :P Though many of them 
are from unit tests (which, arguably maybe OK) there are still a lot in the 
core package. I agree that we should avoid catching Throwable whenever 
possible, which will also help enforcing the developers to think about possible 
checked exceptions in the calling trace.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92496
---


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36664/
 ---
 
 (Updated July 22, 2015, 5:02 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2353
 https://issues.apache.org/jira/browse/KAFKA-2353
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Gwen's comments
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/SocketServer.scala 
 91319fa010b140cca632e5fa8050509bd2295fc9 
 
 Diff: https://reviews.apache.org/r/36664/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Gwen Shapira


 On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 465
  https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465
 
  Turns out that catching Throwable is a really bad idea: 
  https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
 
 Jiangjie Qin wrote:
 Ah... Didn't know that before. I explicitly listed the exceptions.
 
 Guozhang Wang wrote:
 Searching : Throwable gives me 180+ cases in code base :P Though many 
 of them are from unit tests (which, arguably maybe OK) there are still a lot 
 in the core package. I agree that we should avoid catching Throwable whenever 
 possible, which will also help enforcing the developers to think about 
 possible checked exceptions in the calling trace.

I know :(
I'm not sure if going over and converting everything is worth the effort. 
Although it can be a nice newbie jira.


 On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 400
  https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400
 
  So in case of unexpected exception, we log an error and keep running?
  
  Isn't it better to kill the processor, since we don't know what's the 
  state of the system? If the acceptor keeps placing messages in the queue 
  for a dead processor, isn't it a separate issue?
 
 Jiangjie Qin wrote:
 This part I'm not quite sure. I am not very experienced in the error 
 handling in such case, so please correct me if I missed something.
 Here is what I thought.
 
 The way it currently works is that the acceptor will 
 1. accept new connection request and create new socket channel
 2. choose a processor and put the socket channel into the processor's new 
 connection queue
 
 The processor will just take the socket channels from the queue and 
 register it to the selector.
 If the processor runs and get an uncaught exception, there are several 
 possibilities. 
 Case 1: The exception was from one socket channel. 
 Case 2: The exception was associated with a bad request. 
 In case 1, ideally we should just disconnect that socket channel without 
 affecting other socket channels.
 In case 2, I think we should log the error and skip the message - 
 assuming client will retry sending data if no response was received for a 
 given peoriod of time.
 
 I am not sure if letting processor exit is a good idea because this will 
 lead to the result of a badly behaving client screw the entire cluster - it 
 might screw processors one by one. Comparing with that, I kind of leaning 
 towards keeping the processor running and serving other normal TCP 
 connections if possible, but log the error so monitoring system can detect 
 and see if human intervention is needed.
 
 Also, I don't know what to do here to prevent the thread from exiting 
 without catching all the throwables.
 According to this blog 
 http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
 I guess I can rethrow all the ControlThrowables, but intercept the rests?
 
 Guozhang Wang wrote:
 I would also prefer not to close the Processor thread upon exceptions, 
 mainly for avoid one bad client killing a shared Kafka cluster. In the past 
 we have seen such issues like DDoS MetadataRequest killing the cluster and 
 all other clients gets affected, etc, and the quota work is towards 
 preventing it. Since Processor threads are shared (8 by default on a broker), 
 it should not be closed by a single socket / bad client request.

I like your thinking around cases #1 and #2. I think this should go as a code 
comment somewhere, so when people improve / extend SocketServer they will keep 
this logic in mind. Maybe even specify in specific catch clauses if they are 
handling possible errors in request level or channel level.

My concern is with possible case #3: Each processor has an 
o.a.k.common.network.Selector. I'm concerned about the possibility of something 
going wrong in the state of the selector, which will possibly be an issue for 
all channels. For example failure to register could be an issue with the 
channel.register call, but also perhaps an issue with keys.put (just an example 
- I'm not sure something can actually break keys table). 

I'd like to be able to identify cases where the Selector state may have gone 
wrong and close the processor in that case. Does that make any sense? Or am I 
being too paranoid?


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92496
---


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 

[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637219#comment-14637219
 ] 

Yasuhiro Matsuda commented on KAFKA-2350:
-

I think overloading subscribe/unsubscribe is very confusing. 
Subscribe/unsubscribe and pause/unpause are two very different behaviors.  And 
overloading same method names is not really simplifying the API. I want 
pause/unpause to be a pure flow control. It shouldn't be mix up with 
subscription.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637317#comment-14637317
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~hachikuji] Yeah I agree that would help this case. The nice thing about that 
proposal is it would make the group management explicit which could be nice. I 
wonder if that might not add more things that can go wrong in the common case, 
though. i.e. right now the common case of just subscribing to a topic and 
letting the group management figure out the assignment and it is kind of hard 
to mess that up. All the cases where either you subscribe to individual 
partitions or you pause partitions are kind of niche uses so maybe it is less 
important to optimize for those cases?

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637187#comment-14637187
 ] 

Jason Gustafson edited comment on KAFKA-2350 at 7/22/15 5:01 PM:
-

[~becket_qin] [~jkreps] Currently automatic assignment is inferred based on 
which subscribe methods are invoked (e.g. if you subscribe to a topic, then we 
assume you want automatic assignment). I wonder if it might help to make that 
instead an explicit configuration parameter? Then that might free us a little 
bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we 
don't have to guess whether the user is intending to actually subscribe to a 
partition or just to unpause it. 


was (Author: hachikuji):
[~becket_qin] [~jkreps] Currently automatic assignment is inferred based on 
which subscribe methods are invoked (e.g. if you subscribe to a topic, then we 
assume you want automatic assignment). I wonder if it might help to make that 
instead an explicit configuration parameter? Then that might free us a little 
bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we 
don't have to guess whether the user is intending to actually subscribe to a 
partition or just to pause it. 

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92607
---


LGTM overall. Could you address Ismael's comments as well before check-in?

- Guozhang Wang


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36664/
 ---
 
 (Updated July 22, 2015, 5:02 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2353
 https://issues.apache.org/jira/browse/KAFKA-2353
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Gwen's comments
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/SocketServer.scala 
 91319fa010b140cca632e5fa8050509bd2295fc9 
 
 Diff: https://reviews.apache.org/r/36664/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Commented] (KAFKA-2352) Possible memory leak in MirrorMaker and/or new Producer

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637208#comment-14637208
 ] 

Jiangjie Qin commented on KAFKA-2352:
-

Can you verify whether the messages you passed to producer has been sent or 
not? In mirror maker the default configuration was set to avoid data loss. So 
it producer cannot send a message it will retry infinitely. In that case, you 
might exhaust the producer's buffer if you keep appending messages to producer.

 Possible memory leak in MirrorMaker and/or new Producer
 ---

 Key: KAFKA-2352
 URL: https://issues.apache.org/jira/browse/KAFKA-2352
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Kostya Golikov
 Attachments: consumer.conf, output.log, producer.conf


 I've been playing around with Mirror Maker (version from trunk, dated July 
 7th) and got a few problems, most noticeable of which is that MirrorMaker 
 exhausts it's memory pool, even though it's size set to relatively huge value 
 of 132 MB, and individual messages are around 2 KB. Batch size is set to just 
 2 messages (full configs are attached). 
 {code}
 [2015-07-21 15:19:52,915] FATAL [mirrormaker-thread-1] Mirror maker thread 
 failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
 org.apache.kafka.clients.producer.BufferExhaustedException: You have 
 exhausted the 134217728 bytes of memory you configured for the client and the 
 client is configured to error rather than block when memory is exhausted.
   at 
 org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:124)
   at 
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:172)
   at 
 org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:388)
   at 
 kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:380)
   at 
 kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:311)
   at 
 kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:311)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
   at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:311)
 {code}
 Am I doing something wrong? Any help in further diagnosing of this problem 
 might be handy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637338#comment-14637338
 ] 

Gwen Shapira commented on KAFKA-2350:
-

Agree with [~yasuhiro.matsuda]. 

The notion of subscribe vs assigned is a bit challenging to grasp as is 
([~guozhang] demostrated that nicely), adding flow-control mechanics into it 
will be messy and difficult for users to get right. 
I think we all hope that in most cases users will subscribe and unsubscribe to 
entire topics and let the coordinator handle the details (thereby reducing 
use-errors, mailing list cries for help, etc). Adding APIs that will cause the 
opposite to happen is a step in the opposite direction. 

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-22 Thread Gwen Shapira
Tangent: I think we should complete the move of Produce / Fetch RPC to
the client libraries before we add more revisions to this protocol.

On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
j...@linkedin.com.invalid wrote:
 I missed yesterday's KIP hangout. I'm currently working on another KIP for
 enriched metadata of messages. Guozhang has already created a wiki page
 before (
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata).
 We plan to fill the relative offset to the offset field in the batch sent
 by producer to avoid broker side re-compression. The message offset would
 become batch base offset + relative offset. I guess maybe the expected
 offset in KIP-27 can be only set for base offset? Would that affect certain
 use cases?

 For Jun's comments, I am not sure I completely get it. I think the producer
 only sends one batch per partition in a request. So either that batch is
 appended or not. Why a batch would be partially committed?

 Thanks,

 Jiangjie (Becket) Qin

 On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote:

 That's a fair point. I've added some imagined job logic to the KIP, so
 we can make sure the proposal stays in sync with the usages we're
 discussing. (The logic is just a quick sketch for now -- I expect I'll
 need to elaborate it as we get into more detail, or to address other
 concerns...)

 On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote:
  For 1, yes, when there is a transient leader change, it's guaranteed
 that a
  prefix of the messages in a request will be committed. However, it seems
  that the client needs to know what subset of messages are committed in
  order to resume the sending. Then the question is how.
 
  As Flavio indicated, for the use cases that you listed, it would be
 useful
  to figure out the exact logic by using this feature. For example, in the
  partition K/V store example, when we fail over to a new writer to the
  commit log, the zombie writer can publish new messages to the log after
 the
  new writer takes over, but before it publishes any message. We probably
  need to outline how this case can be handled properly.
 
  Thanks,
 
  Jun
 
  On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote:
 
  Hi Jun,
 
  Thanks for the close reading! Responses inline.
 
   Thanks for the write-up. The single producer use case you mentioned
 makes
   sense. It would be useful to include that in the KIP wiki.
 
  Great -- I'll make sure that the wiki is clear about this.
 
   1. What happens when the leader of the partition changes in the middle
  of a
   produce request? In this case, the producer client is not sure whether
  the
   request succeeds or not. If there is only a single message in the
  request,
   the producer can just resend the request. If it sees an OffsetMismatch
   error, it knows that the previous send actually succeeded and can
 proceed
   with the next write. This is nice since it not only allows the
 producer
  to
   proceed during transient failures in the broker, it also avoids
  duplicates
   during producer resend. One caveat is when there are multiple
 messages in
   the same partition in a produce request. The issue is that in our
 current
   replication protocol, it's possible for some, but not all messages in
 the
   request to be committed. This makes resend a bit harder to deal with
  since
   on receiving an OffsetMismatch error, it's not clear which messages
 have
   been committed. One possibility is to expect that compression is
 enabled,
   in which case multiple messages are compressed into a single message.
 I
  was
   thinking that another possibility is for the broker to return the
 current
   high watermark when sending an OffsetMismatch error. Based on this
 info,
   the producer can resend the subset of messages that have not been
   committed. However, this may not work in a compacted topic since there
  can
   be holes in the offset.
 
  This is a excellent question. It's my understanding that at least a
  *prefix* of messages will be committed (right?) -- which seems to be
  enough for many cases. I'll try and come up with a more concrete
  answer here.
 
   2. Is this feature only intended to be used with ack = all? The client
   doesn't get the offset with ack = 0. With ack = 1, it's possible for a
   previously acked message to be lost during leader transition, which
 will
   make the client logic more complicated.
 
  It's true that acks = 0 doesn't seem to be particularly useful; in all
  the cases I've come across, the client eventually wants to know about
  the mismatch error. However, it seems like there are some cases where
  acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
  losing messages during a leader transition just means you need to
  rewind / restart the load, which is not especially catastrophic. For
  many other interesting cases, acks = all is probably preferable.
 
   3. How does the producer client know 

Re: [VOTE] Switch to GitHub pull requests for new contributions

2015-07-22 Thread Jiangjie Qin
+1 (non binding)

Can we have a wiki for procedure and let people verify the steps? After
that we can update the Apache project page.

On Tue, Jul 21, 2015 at 5:38 PM, Edward Ribeiro edward.ribe...@gmail.com
wrote:

 +1 (non binding)

 On Tue, Jul 21, 2015 at 7:36 PM, Jay Kreps j...@confluent.io wrote:

  +1
 
  -Jay
 
  On Tue, Jul 21, 2015 at 4:28 AM, Ismael Juma ism...@juma.me.uk wrote:
 
   Hi all,
  
   I would like to start a vote on switching to GitHub pull requests for
 new
   contributions. To be precise, the vote is on whether we should:
  
   * Update the documentation to tell users to use pull requests instead
 of
   patches and Review Board (i.e. merge KAFKA-2321 and KAFKA-2349)
   * Use pull requests for new contributions
  
   In a previous discussion[1], everyone that participated was in favour.
  It's
   also worth reading the Contributing Code Changes wiki page[2] (if you
   haven't already) to understand the flow.
  
   A number of pull requests have been merged in the last few weeks to
 test
   this flow and I believe it's working well enough. As usual, there is
  always
   room for improvement and I expect is to tweak things as time goes on.
  
   The main downside of using GitHub pull requests is that we don't have
  write
   access to https://github.com/apache/kafka. That means that we rely on
   commit hooks to close integrated pull requests (the merge script takes
  care
   of formatting the message so that this happens) and the PR creator or
   Apache Infra to close pull requests that are not integrated.
  
   Regarding existing contributions, I think it's up to the contributor to
   decide whether they want to resubmit it as a pull request or not. I
  expect
   that there will be a transition period where the old and new way will
   co-exist. But that can be discussed separately.
  
   The vote will run for 72 hours.
  
   +1 (non-binding) from me.
  
   Best,
   Ismael
  
   [1] http://search-hadoop.com/m/uyzND1N6CDH1DUc82
   [2]
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
  
 



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637179#comment-14637179
 ] 

Guozhang Wang commented on KAFKA-2350:
--

Currently there is already a function for retrieving the subscribed topic 
partitions today:

{code}
public SetTopicPartition subscriptions() {
acquire();
try {
return 
Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
} finally {
release();
}
}
{code}

which will for example remove the partition and hence change the returned 
values if consumer.unsubscribe(partition) is called.

I actually think [~becket_qin]'s approach will not cause much confusion 
regarding the APIs. More explicitly assuming we add another function 
assignment() that returns you the assigned partitions, the semantics of the 
other APIs will be:

{code}
consumer.subscribe(topic); // will not throw any exception, but will update the 
assignment as well as subscription in the next poll.

consumer.unsubscribe(topic);  // will throw an exception if the topic is not 
subscribed; otherwise will update the assignment and the subscription in the 
next poll.

consumer.assignment(); // return the assigned partitions

consumer.subscriptions();  // return the subscribed partitions, which is the 
same to the assigned partitions most of the time

consumer.subscribe(partition1);  // will throw an exception if partition is not 
in assignment(), saying it is not assigned to you

consumer.unsubscribe(partition2);  // will throw an exception if partition is 
not in subscriptions(), saying it is not subscribed by yourself
{code}

What I am more concerned about this approach is about the client 
implementation. Since it allows a client to be both using Kafka partition 
assignment and not during its life cycle, this could possibly make the client 
state more complicated to manage. For example:

{code}
consumer.subscribe(topic1); // using kafka for assignment, say we are assigned 
topic1-partition1 and topic1-partition2
consumer.poll();
consumer.subscribe(topic2-partition1); // subscribe to another partition 
explicitly without letting kafka coordinator to be aware of.
consumer.unsubscribe(topic1-partition1);  // now the subscription is 
topic1-partition2 and topic2-partition1, where the first is from Kafka 
assignment and the second is from explicit subscription.
{code}

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-22 Thread Jiangjie Qin
I missed yesterday's KIP hangout. I'm currently working on another KIP for
enriched metadata of messages. Guozhang has already created a wiki page
before (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata).
We plan to fill the relative offset to the offset field in the batch sent
by producer to avoid broker side re-compression. The message offset would
become batch base offset + relative offset. I guess maybe the expected
offset in KIP-27 can be only set for base offset? Would that affect certain
use cases?

For Jun's comments, I am not sure I completely get it. I think the producer
only sends one batch per partition in a request. So either that batch is
appended or not. Why a batch would be partially committed?

Thanks,

Jiangjie (Becket) Qin

On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote:

 That's a fair point. I've added some imagined job logic to the KIP, so
 we can make sure the proposal stays in sync with the usages we're
 discussing. (The logic is just a quick sketch for now -- I expect I'll
 need to elaborate it as we get into more detail, or to address other
 concerns...)

 On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote:
  For 1, yes, when there is a transient leader change, it's guaranteed
 that a
  prefix of the messages in a request will be committed. However, it seems
  that the client needs to know what subset of messages are committed in
  order to resume the sending. Then the question is how.
 
  As Flavio indicated, for the use cases that you listed, it would be
 useful
  to figure out the exact logic by using this feature. For example, in the
  partition K/V store example, when we fail over to a new writer to the
  commit log, the zombie writer can publish new messages to the log after
 the
  new writer takes over, but before it publishes any message. We probably
  need to outline how this case can be handled properly.
 
  Thanks,
 
  Jun
 
  On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote:
 
  Hi Jun,
 
  Thanks for the close reading! Responses inline.
 
   Thanks for the write-up. The single producer use case you mentioned
 makes
   sense. It would be useful to include that in the KIP wiki.
 
  Great -- I'll make sure that the wiki is clear about this.
 
   1. What happens when the leader of the partition changes in the middle
  of a
   produce request? In this case, the producer client is not sure whether
  the
   request succeeds or not. If there is only a single message in the
  request,
   the producer can just resend the request. If it sees an OffsetMismatch
   error, it knows that the previous send actually succeeded and can
 proceed
   with the next write. This is nice since it not only allows the
 producer
  to
   proceed during transient failures in the broker, it also avoids
  duplicates
   during producer resend. One caveat is when there are multiple
 messages in
   the same partition in a produce request. The issue is that in our
 current
   replication protocol, it's possible for some, but not all messages in
 the
   request to be committed. This makes resend a bit harder to deal with
  since
   on receiving an OffsetMismatch error, it's not clear which messages
 have
   been committed. One possibility is to expect that compression is
 enabled,
   in which case multiple messages are compressed into a single message.
 I
  was
   thinking that another possibility is for the broker to return the
 current
   high watermark when sending an OffsetMismatch error. Based on this
 info,
   the producer can resend the subset of messages that have not been
   committed. However, this may not work in a compacted topic since there
  can
   be holes in the offset.
 
  This is a excellent question. It's my understanding that at least a
  *prefix* of messages will be committed (right?) -- which seems to be
  enough for many cases. I'll try and come up with a more concrete
  answer here.
 
   2. Is this feature only intended to be used with ack = all? The client
   doesn't get the offset with ack = 0. With ack = 1, it's possible for a
   previously acked message to be lost during leader transition, which
 will
   make the client logic more complicated.
 
  It's true that acks = 0 doesn't seem to be particularly useful; in all
  the cases I've come across, the client eventually wants to know about
  the mismatch error. However, it seems like there are some cases where
  acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
  losing messages during a leader transition just means you need to
  rewind / restart the load, which is not especially catastrophic. For
  many other interesting cases, acks = all is probably preferable.
 
   3. How does the producer client know the offset to send the first
  message?
   Do we need to expose an API in producer to get the current high
  watermark?
 
  You're right, it might be irritating to have to go through the
  consumer API just for this. There are some 

Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Todd Palino
Since I've been dealing with the fallout of this particular problem all
week, I'll add a few thoughts...


On Wed, Jul 22, 2015 at 10:51 AM, Gwen Shapira gshap...@cloudera.com
wrote:



  On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
   core/src/main/scala/kafka/network/SocketServer.scala, line 465
   
 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465
 
  
   Turns out that catching Throwable is a really bad idea:
 https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
 
  Jiangjie Qin wrote:
  Ah... Didn't know that before. I explicitly listed the exceptions.
 
  Guozhang Wang wrote:
  Searching : Throwable gives me 180+ cases in code base :P Though
 many of them are from unit tests (which, arguably maybe OK) there are still
 a lot in the core package. I agree that we should avoid catching Throwable
 whenever possible, which will also help enforcing the developers to think
 about possible checked exceptions in the calling trace.

 I know :(
 I'm not sure if going over and converting everything is worth the effort.
 Although it can be a nice newbie jira.


  On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
   core/src/main/scala/kafka/network/SocketServer.scala, line 400
   
 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400
 
  
   So in case of unexpected exception, we log an error and keep
 running?
  
   Isn't it better to kill the processor, since we don't know what's
 the state of the system? If the acceptor keeps placing messages in the
 queue for a dead processor, isn't it a separate issue?
 
  Jiangjie Qin wrote:
  This part I'm not quite sure. I am not very experienced in the error
 handling in such case, so please correct me if I missed something.
  Here is what I thought.
 
  The way it currently works is that the acceptor will
  1. accept new connection request and create new socket channel
  2. choose a processor and put the socket channel into the
 processor's new connection queue
 
  The processor will just take the socket channels from the queue and
 register it to the selector.
  If the processor runs and get an uncaught exception, there are
 several possibilities.
  Case 1: The exception was from one socket channel.
  Case 2: The exception was associated with a bad request.
  In case 1, ideally we should just disconnect that socket channel
 without affecting other socket channels.
  In case 2, I think we should log the error and skip the message -
 assuming client will retry sending data if no response was received for a
 given peoriod of time.
 
  I am not sure if letting processor exit is a good idea because this
 will lead to the result of a badly behaving client screw the entire cluster
 - it might screw processors one by one. Comparing with that, I kind of
 leaning towards keeping the processor running and serving other normal TCP
 connections if possible, but log the error so monitoring system can detect
 and see if human intervention is needed.
 
  Also, I don't know what to do here to prevent the thread from
 exiting without catching all the throwables.
  According to this blog
 http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
  I guess I can rethrow all the ControlThrowables, but intercept the
 rests?
 
  Guozhang Wang wrote:
  I would also prefer not to close the Processor thread upon
 exceptions, mainly for avoid one bad client killing a shared Kafka cluster.
 In the past we have seen such issues like DDoS MetadataRequest killing the
 cluster and all other clients gets affected, etc, and the quota work is
 towards preventing it. Since Processor threads are shared (8 by default on
 a broker), it should not be closed by a single socket / bad client request.

 I like your thinking around cases #1 and #2. I think this should go as a
 code comment somewhere, so when people improve / extend SocketServer they
 will keep this logic in mind. Maybe even specify in specific catch clauses
 if they are handling possible errors in request level or channel level.

 My concern is with possible case #3: Each processor has an
 o.a.k.common.network.Selector. I'm concerned about the possibility of
 something going wrong in the state of the selector, which will possibly be
 an issue for all channels. For example failure to register could be an
 issue with the channel.register call, but also perhaps an issue with
 keys.put (just an example - I'm not sure something can actually break keys
 table).

 I'd like to be able to identify cases where the Selector state may have
 gone wrong and close the processor in that case. Does that make any sense?
 Or am I being too paranoid?


If there are error cases that are not associated with a specific connection
or request, then I agree that we should handle that a little differently.
What we need to keep in mind is that close the processor is actually
terminate the 

Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-07-22 Thread Jiangjie Qin
I don't think we have consumer coordinator in 0.8.2 brokers. So
KafkaConsumer in 0.8.3 will only be able to subscribe to partitions
explicitly. Subscribing to a topic won't work with 0.8.2 brokers.

Jiangjie (Becket) Qin

On Wed, Jul 22, 2015 at 4:26 AM, Stevo Slavić ssla...@gmail.com wrote:

 I'm getting Unknown api code 11 even when both client and server are
 0.8.3/trunk, when KafkaConsumer.subscribe(String... topics) is used.

 Bug?

 Kind regards,
 Stevo Slavic.

 On Fri, Apr 24, 2015 at 7:13 PM, Neha Narkhede n...@confluent.io wrote:

  Yes, I was clearly confused :-)
 
  On Fri, Apr 24, 2015 at 9:37 AM, Sean Lydon lydon.s...@gmail.com
 wrote:
 
   Thanks for the responses. Ewen is correct that I am referring to the
   *new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer).
  
   I am extending the consumer to allow my applications more control over
   committed offsets.  I really want to get away from zookeeper (so using
   the offset storage), and re-balancing is something I haven't really
   needed to tackle in an automated/seamless way.  Either way, I'll hold
   off going further down this road until there is more interest.
  
   @Gwen
   I set up a single consumer without partition.assignment.strategy or
   rebalance.callback.class.  I was unable to subscribe to just a topic
   (Unknown api code 11 on broker), but I could subscribe to a
   topicpartition.  This makes sense as I would need to handle re-balance
   outside the consumer.  Things functioned as expected (well  I have an
   additional minor fix to code from KAFKA-2121), and the only exceptions
   on broker were due to closing consumers (which I have become
   accustomed to).  My tests are specific to my extended version of the
   consumer, but they basically do a little writing and reading with
   different serde classes with application controlled commits (similar
   to onSuccess and onFailure after each record, but with tolerance for
   out of order acknowledgements).
  
   If you are interested, here is the patch of the hack against trunk.
  
   On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava
   e...@confluent.io wrote:
@Neha I think you're mixing up the 0.8.1/0.8.2 updates and the
   0.8.2/0.8.3
that's being discussed here?
   
I think the original question was about using the *new* consumer
   (clients
consumer) with 0.8.2. Gwen's right, it will use features not even
implemented in the broker in trunk yet, let alone the 0.8.2.
   
I don't think the enable.commit.downgrade type option, or
 supporting
   the
old protocol with the new consumer at all, makes much sense. You'd
 end
  up
with some weird hybrid of simple and high-level consumers -- you
 could
   use
offset storage, but you'd have to manage rebalancing yourself since
  none
   of
the coordinator support would be there.
   
   
On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede n...@confluent.io
   wrote:
   
My understanding is that ideally the 0.8.3 consumer should work with
  an
0.8.2 broker if the offset commit config was set to zookeeper.
   
The only thing that might not work is offset commit to Kafka, which
   makes
sense since the 0.8.2 broker does not support Kafka based offset
management.
   
If we broke all kinds of offset commits, then it seems like a
   regression,
no?
   
On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira 
 gshap...@cloudera.com
wrote:
   
 I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
 broker... there are some essential pieces that are missing in
 0.8.2
 (Coordinator, Heartbeat, etc).
 Maybe I'm missing something. It will be nice if this will work :)

 Mind sharing what / how you tested? Were there no errors in broker
 logs after your fix?

 On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon lydon.s...@gmail.com
 
wrote:
  Currently the clients consumer (trunk) sends offset commit
  requests
   of
  version 2.  The 0.8.2 brokers fail to handle this particular
  request
  with a:
 
  java.lang.AssertionError: assertion failed: Version 2 is invalid
  for
  OffsetCommitRequest. Valid versions are 0 or 1.
 
  I was able to make this work via a forceful downgrade of this
  particular request, but I would like some feedback on whether a
  enable.commit.downgrade configuration would be a tolerable
  method
   to
  allow 0.8.3 consumers to interact with 0.8.2 brokers.  I'm also
  interested in this even being a goal worth pursuing.
 
  Thanks,
  Sean

   
   
   
--
Thanks,
Neha
   
   
   
   
--
Thanks,
Ewen
  
 
 
 
  --
  Thanks,
  Neha
 



[jira] [Commented] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637525#comment-14637525
 ] 

ASF GitHub Bot commented on KAFKA-2342:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/88


 KafkaConsumer rebalance with in-flight fetch can cause invalid position
 ---

 Key: KAFKA-2342
 URL: https://issues.apache.org/jira/browse/KAFKA-2342
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao
Assignee: Jason Gustafson
 Fix For: 0.9.0


 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
 up updating the fetch position of a partition to an offset which is no longer 
 valid. The consequence is that we may end up either returning to the user 
 messages with an unexpected position or we may fail to give back the right 
 offset in position(). 
 Additionally, this bug causes transient test failures in 
 ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
 exception:
 kafka.api.ConsumerBounceTest  testConsumptionWithBrokerFailures FAILED
 java.lang.NullPointerException
 at 
 org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
 at 
 kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
 at 
 kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-2342.
--
   Resolution: Fixed
Fix Version/s: 0.9.0

Issue resolved by pull request 88
[https://github.com/apache/kafka/pull/88]

 KafkaConsumer rebalance with in-flight fetch can cause invalid position
 ---

 Key: KAFKA-2342
 URL: https://issues.apache.org/jira/browse/KAFKA-2342
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao
Assignee: Jason Gustafson
 Fix For: 0.9.0


 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
 up updating the fetch position of a partition to an offset which is no longer 
 valid. The consequence is that we may end up either returning to the user 
 messages with an unexpected position or we may fail to give back the right 
 offset in position(). 
 Additionally, this bug causes transient test failures in 
 ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
 exception:
 kafka.api.ConsumerBounceTest  testConsumptionWithBrokerFailures FAILED
 java.lang.NullPointerException
 at 
 org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
 at 
 kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
 at 
 kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread Stevo Slavic (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637533#comment-14637533
 ] 

Stevo Slavic commented on KAFKA-2342:
-

[~guozhang], fix version 0.9.0 or 0.8.3?

 KafkaConsumer rebalance with in-flight fetch can cause invalid position
 ---

 Key: KAFKA-2342
 URL: https://issues.apache.org/jira/browse/KAFKA-2342
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao
Assignee: Jason Gustafson
 Fix For: 0.9.0


 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
 up updating the fetch position of a partition to an offset which is no longer 
 valid. The consequence is that we may end up either returning to the user 
 messages with an unexpected position or we may fail to give back the right 
 offset in position(). 
 Additionally, this bug causes transient test failures in 
 ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
 exception:
 kafka.api.ConsumerBounceTest  testConsumptionWithBrokerFailures FAILED
 java.lang.NullPointerException
 at 
 org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
 at 
 kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
 at 
 kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2342:
-
Fix Version/s: (was: 0.9.0)
   0.8.3

 KafkaConsumer rebalance with in-flight fetch can cause invalid position
 ---

 Key: KAFKA-2342
 URL: https://issues.apache.org/jira/browse/KAFKA-2342
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao
Assignee: Jason Gustafson
 Fix For: 0.8.3


 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
 up updating the fetch position of a partition to an offset which is no longer 
 valid. The consequence is that we may end up either returning to the user 
 messages with an unexpected position or we may fail to give back the right 
 offset in position(). 
 Additionally, this bug causes transient test failures in 
 ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
 exception:
 kafka.api.ConsumerBounceTest  testConsumptionWithBrokerFailures FAILED
 java.lang.NullPointerException
 at 
 org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
 at 
 kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
 at 
 kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] Switch to GitHub pull requests for new contributions

2015-07-22 Thread Ismael Juma
Hi,

On 22 Jul 2015 19:32, Jiangjie Qin j...@linkedin.com.invalid wrote:

 +1 (non binding)

 Can we have a wiki for procedure and let people verify the steps? After
 that we can update the Apache project page.

Yes, I linked to the page in the original message:

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes

Committers and contributors have provided feedback in previous discussions
in the mailing list. Further improvements are welcome, of course.

Also see KAFKA-2321 and KAFKA-2349 for updates to the website and GitHub's
CONTRIBUTING.md.

Ismael


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Jiangjie Qin


 On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 465
  https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465
 
  Turns out that catching Throwable is a really bad idea: 
  https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
 
 Jiangjie Qin wrote:
 Ah... Didn't know that before. I explicitly listed the exceptions.
 
 Guozhang Wang wrote:
 Searching : Throwable gives me 180+ cases in code base :P Though many 
 of them are from unit tests (which, arguably maybe OK) there are still a lot 
 in the core package. I agree that we should avoid catching Throwable whenever 
 possible, which will also help enforcing the developers to think about 
 possible checked exceptions in the calling trace.
 
 Gwen Shapira wrote:
 I know :(
 I'm not sure if going over and converting everything is worth the effort. 
 Although it can be a nice newbie jira.

Maybe we can simply change that to catch Throwables except ControlThrowables? 
That might be a simple search and replace.


 On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 400
  https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400
 
  So in case of unexpected exception, we log an error and keep running?
  
  Isn't it better to kill the processor, since we don't know what's the 
  state of the system? If the acceptor keeps placing messages in the queue 
  for a dead processor, isn't it a separate issue?
 
 Jiangjie Qin wrote:
 This part I'm not quite sure. I am not very experienced in the error 
 handling in such case, so please correct me if I missed something.
 Here is what I thought.
 
 The way it currently works is that the acceptor will 
 1. accept new connection request and create new socket channel
 2. choose a processor and put the socket channel into the processor's new 
 connection queue
 
 The processor will just take the socket channels from the queue and 
 register it to the selector.
 If the processor runs and get an uncaught exception, there are several 
 possibilities. 
 Case 1: The exception was from one socket channel. 
 Case 2: The exception was associated with a bad request. 
 In case 1, ideally we should just disconnect that socket channel without 
 affecting other socket channels.
 In case 2, I think we should log the error and skip the message - 
 assuming client will retry sending data if no response was received for a 
 given peoriod of time.
 
 I am not sure if letting processor exit is a good idea because this will 
 lead to the result of a badly behaving client screw the entire cluster - it 
 might screw processors one by one. Comparing with that, I kind of leaning 
 towards keeping the processor running and serving other normal TCP 
 connections if possible, but log the error so monitoring system can detect 
 and see if human intervention is needed.
 
 Also, I don't know what to do here to prevent the thread from exiting 
 without catching all the throwables.
 According to this blog 
 http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
 I guess I can rethrow all the ControlThrowables, but intercept the rests?
 
 Guozhang Wang wrote:
 I would also prefer not to close the Processor thread upon exceptions, 
 mainly for avoid one bad client killing a shared Kafka cluster. In the past 
 we have seen such issues like DDoS MetadataRequest killing the cluster and 
 all other clients gets affected, etc, and the quota work is towards 
 preventing it. Since Processor threads are shared (8 by default on a broker), 
 it should not be closed by a single socket / bad client request.
 
 Gwen Shapira wrote:
 I like your thinking around cases #1 and #2. I think this should go as a 
 code comment somewhere, so when people improve / extend SocketServer they 
 will keep this logic in mind. Maybe even specify in specific catch clauses if 
 they are handling possible errors in request level or channel level.
 
 My concern is with possible case #3: Each processor has an 
 o.a.k.common.network.Selector. I'm concerned about the possibility of 
 something going wrong in the state of the selector, which will possibly be an 
 issue for all channels. For example failure to register could be an issue 
 with the channel.register call, but also perhaps an issue with keys.put (just 
 an example - I'm not sure something can actually break keys table). 
 
 I'd like to be able to identify cases where the Selector state may have 
 gone wrong and close the processor in that case. Does that make any sense? Or 
 am I being too paranoid?

Hi Gwen, I think what you said makes sense. Maybe I see this more from a 
failure boundary point of view. 

Actually we might need to do more if we let a processor exit. We need to stop 
the acceptor from 

Zookeeper use cases with Kafka

2015-07-22 Thread Prabhjot Bharaj
Hello Folks,

I wish to contribute to Kafka internals. And, one of the things which can
help me do that is understanding how kafka uses zookeeper. I have some of
these basic doubts:-

1. Is zookeeper primarily used for locking ? If yes, in what cases and what
kind of nodes does it use - sequential/ephemeral?

2. Does kafka use zookeeper watches for any of functions ?

3. What kind of state is stored in Zookeeper ? (I believe it has to be the
leader information per partition, but is there anything apart from it?)
What is the scale of data that is stored in Zookeeper ?

Looking forward for your help.

Thanks,
prabcs


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637187#comment-14637187
 ] 

Jason Gustafson commented on KAFKA-2350:


[~becket_qin] [~jkreps] Currently automatic assignment is inferred based on 
which subscribe methods are invoked (e.g. if you subscribe to a topic, then we 
assume you want automatic assignment). I wonder if it might help to make that 
instead an explicit configuration parameter? Then that might free us a little 
bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we 
don't have to guess whether the user is intending to actually subscribe to a 
partition or just to pause it. 

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Gwen Shapira


 On July 22, 2015, 10:40 a.m., Ismael Juma wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 467
  https://reviews.apache.org/r/36664/diff/2/?file=1018375#file1018375line467
 
  As far as I can see `ClosedChannelException`, `IllegalStateException` 
  and `IllegalArgumentException` are enough? Also, you would it be better to 
  use `IOException` instead of `ClosedChannelException`?
  
  What happens if other exceptions are thrown? Will we still have a 
  socket leak?

Yeah, perhaps in addition to listing the expected cases, we should also handle 
nonFatal(e)? (https://tersesystems.com/2012/12/27/error-handling-in-scala/)


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92574
---


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36664/
 ---
 
 (Updated July 22, 2015, 5:02 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2353
 https://issues.apache.org/jira/browse/KAFKA-2353
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Gwen's comments
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/SocketServer.scala 
 91319fa010b140cca632e5fa8050509bd2295fc9 
 
 Diff: https://reviews.apache.org/r/36664/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[GitHub] kafka pull request: KAFKA-2342; KafkaConsumer rebalance with in-fl...

2015-07-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/88


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637539#comment-14637539
 ] 

Guozhang Wang commented on KAFKA-2342:
--

[~sslavic] corrected, thanks.

 KafkaConsumer rebalance with in-flight fetch can cause invalid position
 ---

 Key: KAFKA-2342
 URL: https://issues.apache.org/jira/browse/KAFKA-2342
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao
Assignee: Jason Gustafson
 Fix For: 0.8.3


 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
 up updating the fetch position of a partition to an offset which is no longer 
 valid. The consequence is that we may end up either returning to the user 
 messages with an unexpected position or we may fail to give back the right 
 offset in position(). 
 Additionally, this bug causes transient test failures in 
 ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
 exception:
 kafka.api.ConsumerBounceTest  testConsumptionWithBrokerFailures FAILED
 java.lang.NullPointerException
 at 
 org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
 at 
 kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
 at 
 kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36590: Patch for KAFKA-2275

2015-07-22 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36590/#review92557
---


Hey guys, I realized that most of this code has changed as the design to 
address this itself changed a bit. The new diff is available at 
https://reviews.apache.org/r/36681/. My apologies for the inconvenience, I 
should have vetted out design details before posting a patch.

- Ashish Singh


On July 20, 2015, 5:44 p.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36590/
 ---
 
 (Updated July 20, 2015, 5:44 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2275
 https://issues.apache.org/jira/browse/KAFKA-2275
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Add logic to get all topics when needMetadataForAllTopics is set on metadata
 
 
 Return metadata for all topics if empty list is passed to partitionsFor
 
 
 KAFKA-2275: Add a MapString, ListPartitionInfo partitionsFor(String... 
 topics) API to the new consumer
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
 0387f2602c93a62cd333f1b3c569ca6b66b5b779 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 c14eed1e95f2e682a235159a366046f00d1d90d6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
   clients/src/main/java/org/apache/kafka/common/Cluster.java 
 60594a7dce90130911a626ea80cf80d815aeb46e 
   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
 
 Diff: https://reviews.apache.org/r/36590/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ashish Singh
 




Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/
---

Review request for kafka.


Bugs: KAFKA-2275
https://issues.apache.org/jira/browse/KAFKA-2275


Repository: kafka


Description
---

KAFKA-2275: Add a ListTopics() API to the new consumer


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
48fe7961e2215372d8033ece4af739ea06c6457b 
  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
252b759c0801f392e3526b0f31503b4b8fbf1c8a 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
bea3d737c51be77d5b5293cdd944d33b905422ba 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
c14eed1e95f2e682a235159a366046f00d1d90d6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 

Diff: https://reviews.apache.org/r/36681/diff/


Testing
---


Thanks,

Ashish Singh



[jira] [Commented] (KAFKA-2275) Add a ListTopics() API to the new consumer

2015-07-22 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636378#comment-14636378
 ] 

Ashish K Singh commented on KAFKA-2275:
---

Created reviewboard https://reviews.apache.org/r/36681/
 against branch trunk

 Add a ListTopics() API to the new consumer
 --

 Key: KAFKA-2275
 URL: https://issues.apache.org/jira/browse/KAFKA-2275
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Ashish K Singh
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-2275.patch, KAFKA-2275.patch, 
 KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch


 With regex subscription like
 {code}
 consumer.subscribe(topic*)
 {code}
 The partition assignment is automatically done at the Kafka side, while there 
 are some use cases where consumers want regex subscriptions but not 
 Kafka-side partition assignment, rather with their own specific partition 
 assignment. With ListTopics() they can periodically check for topic list 
 changes and specifically subscribe to the partitions of the new topics.
 For implementation, it involves sending a TopicMetadataRequest to a random 
 broker and parse the response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] Partitioning in Kafka

2015-07-22 Thread Gianmarco De Francisci Morales
Hello folks,

I'd like to ask the community about its opinion on the partitioning
functions in Kafka.

With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
integrated we are now able to have custom partitioners in the producer.
The question now becomes *which* partitioners should ship with Kafka?
This issue arose in the context of KAFKA-2092
https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
specific load-balanced partitioning. This partitioner however assumes some
stages of processing on top of it to make proper use of the data, i.e., it
envisions Kafka as a substrate for stream processing, and not only as the
I/O component.
Is this a direction that Kafka wants to go towards? Or is this a role
better left to the internal communication systems of other stream
processing engines (e.g., Storm)?
And if the answer is the latter, how would something such a Samza (which
relies mostly on Kafka as its communication substrate) be able to implement
advanced partitioning schemes?

Cheers,
--
Gianmarco


Re: Official Kafka Gitter Room?

2015-07-22 Thread Ismael Juma
Hi Gwen,

On Sun, Jul 19, 2015 at 2:26 AM, Gwen Shapira gshap...@cloudera.com wrote:

 So, as an experiment, I created:
 https://apachekafka.slack.com

 I figured we'll give it a whirl for a week or two for dev discussions,
 see how it goes and if we have activity we can add this to the website
 and announce on the lists.


Are people using this? If so, please send me an invite.

Ismael


[jira] [Commented] (KAFKA-2092) New partitioning for better load balancing

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636353#comment-14636353
 ] 

Jason Gustafson commented on KAFKA-2092:


[~azaroth], I'm actually not sure that Samza is the better place. I thought 
that it would have a better chance of integration since Smaza already exposes a 
processing layer, which seems to be needed to use this partitioner. I think 
perhaps what we should do is open this discussion to the Kafka dev list. I 
think we're missing some guidance on what partitioners should be included in 
Kafka. As far as I know, Kafka has only provided primitive partitioning 
strategies up to now, but as Kafka starts to provide more core facilities for 
stream processing, perhaps it should start to include more.

 New partitioning for better load balancing
 --

 Key: KAFKA-2092
 URL: https://issues.apache.org/jira/browse/KAFKA-2092
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: Gianmarco De Francisci Morales
Assignee: Jun Rao
 Attachments: KAFKA-2092-v1.patch, KAFKA-2092-v2.patch


 We have recently studied the problem of load balancing in distributed stream 
 processing systems such as Samza [1].
 In particular, we focused on what happens when the key distribution of the 
 stream is skewed when using key grouping.
 We developed a new stream partitioning scheme (which we call Partial Key 
 Grouping). It achieves better load balancing than hashing while being more 
 scalable than round robin in terms of memory.
 In the paper we show a number of mining algorithms that are easy to implement 
 with partial key grouping, and whose performance can benefit from it. We 
 think that it might also be useful for a larger class of algorithms.
 PKG has already been integrated in Storm [2], and I would like to be able to 
 use it in Samza as well. As far as I understand, Kafka producers are the ones 
 that decide how to partition the stream (or Kafka topic).
 I do not have experience with Kafka, however partial key grouping is very 
 easy to implement: it requires just a few lines of code in Java when 
 implemented as a custom grouping in Storm [3].
 I believe it should be very easy to integrate.
 For all these reasons, I believe it will be a nice addition to Kafka/Samza. 
 If the community thinks it's a good idea, I will be happy to offer support in 
 the porting.
 References:
 [1] 
 https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
 [2] https://issues.apache.org/jira/browse/STORM-632
 [3] https://github.com/gdfm/partial-key-grouping



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Official Kafka Gitter Room?

2015-07-22 Thread Stevo Slavić
On Apache Mahout project we're using Slack as well - for release
coordination. It was found that extra Slack channel does not really fit
into Apache way - it was overused, there were too many design discussions
going on there, to which community at large has no access to, was not and
could not be involved, cannot even see history. This is not the case with
user/dev mailing list, with searchable archives.

Kind regards,
Stevo Slavic.

On Wed, Jul 22, 2015 at 11:07 AM, Ismael Juma ism...@juma.me.uk wrote:

 Hi Gwen,

 On Sun, Jul 19, 2015 at 2:26 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

  So, as an experiment, I created:
  https://apachekafka.slack.com
 
  I figured we'll give it a whirl for a week or two for dev discussions,
  see how it goes and if we have activity we can add this to the website
  and announce on the lists.
 

 Are people using this? If so, please send me an invite.

 Ismael



Re: Kafka Unit Test Failures on a Mac

2015-07-22 Thread Ismael Juma
On Mon, Jul 20, 2015 at 10:45 PM, Grant Henke ghe...@cloudera.com wrote:

 In one run of the core tests I found the following:

- 4584 regular files (REG)
- 376 .jar files
  - Not much one can/should do here. Many are from gradle itself.
   - 2392 kafka .log files
  - why are these being leaked?
  - after a single test no file handles should remain
   - 1162 kafka .log.deleted files
  - why are these being leaked?


Some of those are probably due to:

https://issues.apache.org/jira/browse/KAFKA-1782

Ismael


Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-07-22 Thread Stevo Slavić
I'm getting Unknown api code 11 even when both client and server are
0.8.3/trunk, when KafkaConsumer.subscribe(String... topics) is used.

Bug?

Kind regards,
Stevo Slavic.

On Fri, Apr 24, 2015 at 7:13 PM, Neha Narkhede n...@confluent.io wrote:

 Yes, I was clearly confused :-)

 On Fri, Apr 24, 2015 at 9:37 AM, Sean Lydon lydon.s...@gmail.com wrote:

  Thanks for the responses. Ewen is correct that I am referring to the
  *new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer).
 
  I am extending the consumer to allow my applications more control over
  committed offsets.  I really want to get away from zookeeper (so using
  the offset storage), and re-balancing is something I haven't really
  needed to tackle in an automated/seamless way.  Either way, I'll hold
  off going further down this road until there is more interest.
 
  @Gwen
  I set up a single consumer without partition.assignment.strategy or
  rebalance.callback.class.  I was unable to subscribe to just a topic
  (Unknown api code 11 on broker), but I could subscribe to a
  topicpartition.  This makes sense as I would need to handle re-balance
  outside the consumer.  Things functioned as expected (well  I have an
  additional minor fix to code from KAFKA-2121), and the only exceptions
  on broker were due to closing consumers (which I have become
  accustomed to).  My tests are specific to my extended version of the
  consumer, but they basically do a little writing and reading with
  different serde classes with application controlled commits (similar
  to onSuccess and onFailure after each record, but with tolerance for
  out of order acknowledgements).
 
  If you are interested, here is the patch of the hack against trunk.
 
  On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava
  e...@confluent.io wrote:
   @Neha I think you're mixing up the 0.8.1/0.8.2 updates and the
  0.8.2/0.8.3
   that's being discussed here?
  
   I think the original question was about using the *new* consumer
  (clients
   consumer) with 0.8.2. Gwen's right, it will use features not even
   implemented in the broker in trunk yet, let alone the 0.8.2.
  
   I don't think the enable.commit.downgrade type option, or supporting
  the
   old protocol with the new consumer at all, makes much sense. You'd end
 up
   with some weird hybrid of simple and high-level consumers -- you could
  use
   offset storage, but you'd have to manage rebalancing yourself since
 none
  of
   the coordinator support would be there.
  
  
   On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede n...@confluent.io
  wrote:
  
   My understanding is that ideally the 0.8.3 consumer should work with
 an
   0.8.2 broker if the offset commit config was set to zookeeper.
  
   The only thing that might not work is offset commit to Kafka, which
  makes
   sense since the 0.8.2 broker does not support Kafka based offset
   management.
  
   If we broke all kinds of offset commits, then it seems like a
  regression,
   no?
  
   On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
broker... there are some essential pieces that are missing in 0.8.2
(Coordinator, Heartbeat, etc).
Maybe I'm missing something. It will be nice if this will work :)
   
Mind sharing what / how you tested? Were there no errors in broker
logs after your fix?
   
On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon lydon.s...@gmail.com
   wrote:
 Currently the clients consumer (trunk) sends offset commit
 requests
  of
 version 2.  The 0.8.2 brokers fail to handle this particular
 request
 with a:

 java.lang.AssertionError: assertion failed: Version 2 is invalid
 for
 OffsetCommitRequest. Valid versions are 0 or 1.

 I was able to make this work via a forceful downgrade of this
 particular request, but I would like some feedback on whether a
 enable.commit.downgrade configuration would be a tolerable
 method
  to
 allow 0.8.3 consumers to interact with 0.8.2 brokers.  I'm also
 interested in this even being a goal worth pursuing.

 Thanks,
 Sean
   
  
  
  
   --
   Thanks,
   Neha
  
  
  
  
   --
   Thanks,
   Ewen
 



 --
 Thanks,
 Neha



Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/#review92571
---



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 267)
https://reviews.apache.org/r/36670/#comment146770

You should either include information about the exception that was actually 
thrown or let the original exception propagate. Also, if you choose the former, 
you should do `case _: Throwable` to avoid a warning in Scala 2.11.



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 269)
https://reviews.apache.org/r/36670/#comment146771

Should `verifyTopicDeletion` be inside `finally`? I don't think so because 
you don't want to run it if `fail` is called above. You could move it to be in 
the `e: TopicAlreadyMarkedForDeletionException` clause maybe?


- Ismael Juma


On July 22, 2015, 1:46 a.m., Edward Ribeiro wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36670/
 ---
 
 (Updated July 22, 2015, 1:46 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2355
 https://issues.apache.org/jira/browse/KAFKA-2355
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
 deleted
 
 
 Diffs
 -
 
   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
 fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 
 
 Diff: https://reviews.apache.org/r/36670/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Edward Ribeiro
 




Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92574
---



core/src/main/scala/kafka/network/SocketServer.scala (line 401)
https://reviews.apache.org/r/36664/#comment146775

Is it intentional to ignore `java.lang.Error` too?



core/src/main/scala/kafka/network/SocketServer.scala (line 463)
https://reviews.apache.org/r/36664/#comment146773

As far as I can see `ClosedChannelException`, `IllegalStateException` and 
`IllegalArgumentException` are enough? Also, you would it be better to use 
`IOException` instead of `ClosedChannelException`?

What happens if other exceptions are thrown? Will we still have a socket 
leak?


- Ismael Juma


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36664/
 ---
 
 (Updated July 22, 2015, 5:02 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2353
 https://issues.apache.org/jira/browse/KAFKA-2353
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Gwen's comments
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/SocketServer.scala 
 91319fa010b140cca632e5fa8050509bd2295fc9 
 
 Diff: https://reviews.apache.org/r/36664/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Grant Henke


 On July 22, 2015, 10:29 a.m., Ismael Juma wrote:
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, line 269
  https://reviews.apache.org/r/36670/diff/1/?file=1018343#file1018343line269
 
  Should `verifyTopicDeletion` be inside `finally`? I don't think so 
  because you don't want to run it if `fail` is called above. You could move 
  it to be in the `e: TopicAlreadyMarkedForDeletionException` clause maybe?

Just removing the finally block and having it after the catch should work fine 
too.


- Grant


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/#review92571
---


On July 22, 2015, 1:46 a.m., Edward Ribeiro wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36670/
 ---
 
 (Updated July 22, 2015, 1:46 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2355
 https://issues.apache.org/jira/browse/KAFKA-2355
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
 deleted
 
 
 Diffs
 -
 
   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
 fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 
 
 Diff: https://reviews.apache.org/r/36670/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Edward Ribeiro
 




[jira] [Created] (KAFKA-2356) Support retrieving partitions of ConsumerRecords

2015-07-22 Thread Stevo Slavic (JIRA)
Stevo Slavic created KAFKA-2356:
---

 Summary: Support retrieving partitions of ConsumerRecords
 Key: KAFKA-2356
 URL: https://issues.apache.org/jira/browse/KAFKA-2356
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.3
Reporter: Stevo Slavic
Priority: Trivial


In new consumer on trunk, ConsumerRecords has method to retrieve records for 
given TopicPartition, but there is no method to retrieve TopicPartition's 
included/available in ConsumerRecords. Please have it supported.

Method could be something like:
{noformat}
/**
 * Get partitions of records returned by a {@link Consumer#poll(long)} operation
*/
public SetTopicPartition partitions() {
return Collections.unmodifiableSet(this.records.keySet());
}
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: ConsumerRecords are organized per topic partit...

2015-07-22 Thread sslavic
GitHub user sslavic opened a pull request:

https://github.com/apache/kafka/pull/92

ConsumerRecords are organized per topic partition

ConsumerRecords has records organized per topic partition, not per topic as 
ConsumerRecords javadoc suggested.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sslavic/kafka patch-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/92.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #92


commit 8dacfda7c71b7acaf49a19db0563975b8a96f7a5
Author: Stevo Slavić ssla...@gmail.com
Date:   2015-07-22T13:41:00Z

ConsumerRecords are organized per topic partition

ConsumerRecords has records organized per topic partition, not per topic as 
ConsumerRecords javadoc suggested.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to normal : KafkaPreCommit #160

2015-07-22 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/160/changes



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637753#comment-14637753
 ] 

Jason Gustafson commented on KAFKA-2350:


[~jkreps] This is a little unrelated to this jira, but one nice consequence of 
moving group management into configuration is that it opens the door to using 
subscribe(topic) as a shortcut to subscribing to all of a topic's partitions 
for users who do not want to use group management (which does have a little 
overhead). Currently the user would have to get all the partitions for that 
topic and then subscribe to them individually. Not that bad, but a tad 
annoying, especially if you have to poll for changes in the number of 
partitions. 

Anyway, I tend to agree with [~yasuhiro.matsuda] and [~gwenshap] that trying to 
mix partition and topic subscriptions in order to do pausing seems problematic. 
The explicit pause/unpause methods might not be as elegant, but I think they 
are easier for users to understand and are much easier for us to implement.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637792#comment-14637792
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~guozhang] In the last case you mentioned, is topic2-partition1 one of the 
partitions assigned by consumer coordinator? If it is not, the subscription 
will fail because the partition is not in the assignedTopicpartitions set.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2004) Write Kafka messages directly to HDFS

2015-07-22 Thread sutanu das (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637419#comment-14637419
 ] 

sutanu das commented on KAFKA-2004:
---

We just want to stream each logfile as event from kafka queue via Storm to HDFS

Is there a basic sprout or bolt code available in python to do this?

 Write Kafka messages directly to HDFS
 -

 Key: KAFKA-2004
 URL: https://issues.apache.org/jira/browse/KAFKA-2004
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core, producer 
Affects Versions: 0.8.1.1
Reporter: sutanu das
Assignee: Neha Narkhede
Priority: Critical

 1. Is there a way to write Kafka messages directly to HDFS without writing 
 any consumer code? 
 2. Is there anyway to integrate Kafka with Storm or Spark so messages goes 
 directly from Kafka consumers to HDFS sync?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1893) Allow regex subscriptions in the new consumer

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638028#comment-14638028
 ] 

Jason Gustafson commented on KAFKA-1893:


[~singhashish] Sure, no problem.

 Allow regex subscriptions in the new consumer
 -

 Key: KAFKA-1893
 URL: https://issues.apache.org/jira/browse/KAFKA-1893
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Jay Kreps
Assignee: Jason Gustafson
Priority: Critical
 Fix For: 0.8.3


 The consumer needs to handle subscribing to regular expressions. Presumably 
 this would be done as a new api,
 {code}
   void subscribe(java.util.regex.Pattern pattern);
 {code}
 Some questions/thoughts to work out:
  - It should not be possible to mix pattern subscription with partition 
 subscription.
  - Is it allowable to mix this with normal topic subscriptions? Logically 
 this is okay but a bit complex to implement.
  - We need to ensure we regularly update the metadata and recheck our regexes 
 against the metadata to update subscriptions for new topics that are created 
 or old topics that are deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Edward Ribeiro

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/
---

(Updated July 23, 2015, 1:23 a.m.)


Review request for kafka.


Bugs: KAFKA-2355
https://issues.apache.org/jira/browse/KAFKA-2355


Repository: kafka


Description
---

KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
deleted


Diffs (updated)
-

  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 

Diff: https://reviews.apache.org/r/36670/diff/


Testing
---


Thanks,

Edward Ribeiro



[jira] [Commented] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638036#comment-14638036
 ] 

Edward Ribeiro commented on KAFKA-2355:
---

Updated reviewboard https://reviews.apache.org/r/36670/diff/
 against branch origin/trunk

 Add an unit test to validate the deletion of a partition marked as deleted
 --

 Key: KAFKA-2355
 URL: https://issues.apache.org/jira/browse/KAFKA-2355
 Project: Kafka
  Issue Type: Test
Affects Versions: 0.8.2.1
Reporter: Edward Ribeiro
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch, 
 KAFKA-2355_2015-07-22_22:23:13.patch


 Trying to delete a partition marked as deleted throws 
 {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
 validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edward Ribeiro updated KAFKA-2355:
--
Attachment: KAFKA-2355_2015-07-22_22:23:13.patch

 Add an unit test to validate the deletion of a partition marked as deleted
 --

 Key: KAFKA-2355
 URL: https://issues.apache.org/jira/browse/KAFKA-2355
 Project: Kafka
  Issue Type: Test
Affects Versions: 0.8.2.1
Reporter: Edward Ribeiro
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch, 
 KAFKA-2355_2015-07-22_22:23:13.patch


 Trying to delete a partition marked as deleted throws 
 {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
 validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92700
---

Ship it!


Just one question on the unit test. Otherwise, LGTM.


clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 (line 173)
https://reviews.apache.org/r/36681/#comment146940

Why do we need to do this in a separate thread?


- Jason Gustafson


On July 22, 2015, 11:09 p.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36681/
 ---
 
 (Updated July 22, 2015, 11:09 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2275
 https://issues.apache.org/jira/browse/KAFKA-2275
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for 
 Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all 
 topics. Remove braces for single line if
 
 
 KAFKA-2275: Add a ListTopics() API to the new consumer
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
 ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 c14eed1e95f2e682a235159a366046f00d1d90d6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  d595c1cb07909e21946532501f648be3033603d9 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea 
   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
 
 Diff: https://reviews.apache.org/r/36681/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ashish Singh
 




[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638063#comment-14638063
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

Yes, that is what I meant.

Isn't it the case that subscribe(partition) always check the if 
subscribe(topic) has been called or not?

If user wants to subscribe/unsubscribe to an illegal partition, I kind of think 
it is easy to understand that we just throw exception and say Consumer is 
using Kafka based offset assignment and topic partition tp is not an assigned 
partition.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638070#comment-14638070
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

I think an explicit configuration makes sense. Different subscribe/unsubscribe 
methods confuses people from time to time.

So if enable.consumer.coordinator==false, user are on their own, all the 
subscribe/unsubscribe methods will work.

But what not clear to me is that if enable.consumer.coordinator==true, will 
user be able to call subscribe/unsbuscribe(partitions)?

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Round Robin Among Consumers

2015-07-22 Thread J A
Ordering gurantee should be optional. If someone needs ordering, they can
always fall back on exclusive consumer strategy

On Wednesday, July 22, 2015, Ashish Singh asi...@cloudera.com wrote:

 Hey, don't you think that would be against the basic ordering guarantees
 Kafka provides?

 On Wed, Jul 22, 2015 at 2:14 PM, J A mbatth...@gmail.com javascript:;
 wrote:

  Hi, This is reference to stackoverflow question 
 
 
 http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers
  
  Since Kafka 0.8 already maintains a client offset, i would like to
 request
  a feature, where a single partition consumption can be round robin
 across a
  set of consumers. The message delivery strategy should be an option
 chosen
  by the consumer.
 



 --

 Regards,
 Ashish



Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Edward Ribeiro

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92696
---



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
(line 167)
https://reviews.apache.org/r/36681/#comment146934

The code looks pretty good. Congrats!



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
(line 201)
https://reviews.apache.org/r/36681/#comment146932

Just wondering to myself. If ``retryBackoffMs`` is = than ``timeout``, 
then the caller will have one shot only where it can try to retrieve the 
topics. Not that this is a problem, as ``retryBackoffMs`` is a small amount of 
milliseconds, as far as I saw, but nevertheless something to be aware of.



core/src/test/scala/integration/kafka/api/ConsumerTest.scala (line 198)
https://reviews.apache.org/r/36681/#comment146930

nit: really *really* really cosmetic stuff, but ``topicPartsMap`` makes 
more sense, no? :)


- Edward Ribeiro


On July 22, 2015, 11:09 p.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36681/
 ---
 
 (Updated July 22, 2015, 11:09 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2275
 https://issues.apache.org/jira/browse/KAFKA-2275
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for 
 Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all 
 topics. Remove braces for single line if
 
 
 KAFKA-2275: Add a ListTopics() API to the new consumer
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
 ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 c14eed1e95f2e682a235159a366046f00d1d90d6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  d595c1cb07909e21946532501f648be3033603d9 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea 
   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
 
 Diff: https://reviews.apache.org/r/36681/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ashish Singh
 




Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Jiangjie Qin


 On July 22, 2015, 10:40 a.m., Ismael Juma wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 401
  https://reviews.apache.org/r/36664/diff/2/?file=1018375#file1018375line401
 
  Is it intentional to ignore `java.lang.Error` too?

I think java.lang.Error is a subclass of throwables.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92574
---


On July 23, 2015, 12:51 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36664/
 ---
 
 (Updated July 23, 2015, 12:51 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2353
 https://issues.apache.org/jira/browse/KAFKA-2353
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Gwen's comments
 
 
 Addressed Gwen's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/SocketServer.scala 
 91319fa010b140cca632e5fa8050509bd2295fc9 
 
 Diff: https://reviews.apache.org/r/36664/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Commented] (KAFKA-2351) Brokers are having a problem shutting down correctly

2015-07-22 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638076#comment-14638076
 ] 

Mayuresh Gharat commented on KAFKA-2351:


[~junrao], [~guozhang] can you take a look at this? 

 Brokers are having a problem shutting down correctly
 

 Key: KAFKA-2351
 URL: https://issues.apache.org/jira/browse/KAFKA-2351
 Project: Kafka
  Issue Type: Bug
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat
 Attachments: KAFKA-2351.patch, KAFKA-2351_2015-07-21_14:58:13.patch


 The run() in Acceptor during shutdown might throw an exception that is not 
 caught and it never reaches shutdownComplete due to which the latch is not 
 counted down and the broker will not be able to shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Round Robin Among Consumers

2015-07-22 Thread J A
Why have partition at all, if I don't need to scale topic. Coupling topic
scalability with consumer scalability just goes against messaging systems
core principle of decoupling consumer and producers

On Wednesday, July 22, 2015, Aditya Auradkar aaurad...@linkedin.com.invalid
wrote:

 Hi,

 Why not simply have as many partitions as the set of consumers you want to
 round robin across?

 Aditya

 On Wed, Jul 22, 2015 at 2:37 PM, Ashish Singh asi...@cloudera.com
 javascript:; wrote:

  Hey, don't you think that would be against the basic ordering guarantees
  Kafka provides?
 
  On Wed, Jul 22, 2015 at 2:14 PM, J A mbatth...@gmail.com javascript:;
 wrote:
 
   Hi, This is reference to stackoverflow question 
  
  
 
 http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers
   
   Since Kafka 0.8 already maintains a client offset, i would like to
  request
   a feature, where a single partition consumption can be round robin
  across a
   set of consumers. The message delivery strategy should be an option
  chosen
   by the consumer.
  
 
 
 
  --
 
  Regards,
  Ashish
 



Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Jiangjie Qin


 On July 22, 2015, 5:13 p.m., Guozhang Wang wrote:
  LGTM overall. Could you address Ismael's comments as well before check-in?

Thanks, Guozhang. I updated patch to address Ismael's comments.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92607
---


On July 23, 2015, 12:51 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36664/
 ---
 
 (Updated July 23, 2015, 12:51 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2353
 https://issues.apache.org/jira/browse/KAFKA-2353
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Gwen's comments
 
 
 Addressed Gwen's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/SocketServer.scala 
 91319fa010b140cca632e5fa8050509bd2295fc9 
 
 Diff: https://reviews.apache.org/r/36664/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638139#comment-14638139
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~hachikuji] yeah i vote for pause/resume (or unpause).

I think the challenge with making group management explicit is that then it 
will conflict with the api usage. I.e. the user will say 
client.subscribe(mytopic) and something terrible and bad will happen and they 
won't know why and we will say uh uh uh you forgot to set the magic 
enable.consumer.coordinator=true flag. This was kind of what I liked about the 
implicit approach--the natural usage of just subscribing to a partition or 
topic does what you would expect. I would anticipate making this a flag turning 
into a source of confusion because in the early stages of usage most people 
won't know what a consumer coordinator is.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92711
---



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java (line 
47)
https://reviews.apache.org/r/33620/#comment146946

It seems that we will need to add similar logic in other Sends: MultiSend, 
FetchResponseSend, TopicDataSend and PartitionDataSend.


- Jun Rao


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated July 20, 2015, 7 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Merge commit
 
 
 KAFKA-1690. Added SSL Consumer Test.
 
 
 KAFKA-1690. SSL Support.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Diffs
 -
 
   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 bae528d31516679bed88ee61b408f209f185a8cc 
   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
 PRE-CREATION 
   
 

[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638182#comment-14638182
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~yasuhiro.matsuda] [~jkreps] [~hachikuji] [~gwenshap] If we are using 
pause/unpause, does that mean when user are not using consumer coordinator, 
they are equivalent to subscribe(partitions)/unsubscribe(partitions)?

I still don't understand why this is an overloading of current 
subscribe/unsubscribe API. The way I see how the KafkaConsumer API works is we 
have different methods to set different fields of a fetch request(offsets, 
topic, partitions), then we do a poll() using those settings. 

To me the definition of all the subscribe/unsubscribe methods stay unchanged:
* Subscrube/Unsubscribe to a TOPIC means
** Involve consumer coordinator to do partition assignment
** Consumer rebalance will be triggered
* Subscribe/Unsubscribe to a PARTITION means
** Do not involve consumer coordinator
** Consumer rebalance will not be triggered.

The only change is that instead of naively reject a PARTITION sub/unsub when 
consumer coordinator is involved, we allow users to decide whether they want to 
change the setting for your next poll() to exclude some topic partitions that 
have been assigned to this consumer.

Therefore I don't see why using subscribe(partitions)/unsubscribe(partitions) 
for pause and unpause consuming is a behavior re-definition. It looks to me 
that pause/unpause does the exact same thing as partition level 
subscribe/unsubscribe but we are adding them simply because we think user are 
using them for different use case. if so, does it mean we need to add yet 
another pair of interface if people are subscribe/unsubscribe partitions for 
some other use case? Then we are going to end up with a bunch of interfaces 
doing very similar or even exact same thing but with different names based on 
the use case. 

If the reason we don't like to use sub/unsub is because their names sound like 
purpose oriented and indicate a particular use case, we can change the name to 
something like addTopicPartition()/addTopic() (I know I am a terrible name 
picker, but hopefully you get what I wanted to say).

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Round Robin Among Consumers

2015-07-22 Thread J A
Hi, This is reference to stackoverflow question 
http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers

Since Kafka 0.8 already maintains a client offset, i would like to request
a feature, where a single partition consumption can be round robin across a
set of consumers. The message delivery strategy should be an option chosen
by the consumer.


Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92644
---

Ship it!


LGTM. I think we can checkin after Jason's comments get addressed.

- Guozhang Wang


On July 22, 2015, 6:32 a.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36681/
 ---
 
 (Updated July 22, 2015, 6:32 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2275
 https://issues.apache.org/jira/browse/KAFKA-2275
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2275: Add a ListTopics() API to the new consumer
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
 ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 c14eed1e95f2e682a235159a366046f00d1d90d6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
 
 Diff: https://reviews.apache.org/r/36681/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ashish Singh
 




Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Gwen Shapira


 On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 465
  https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465
 
  Turns out that catching Throwable is a really bad idea: 
  https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
 
 Jiangjie Qin wrote:
 Ah... Didn't know that before. I explicitly listed the exceptions.
 
 Guozhang Wang wrote:
 Searching : Throwable gives me 180+ cases in code base :P Though many 
 of them are from unit tests (which, arguably maybe OK) there are still a lot 
 in the core package. I agree that we should avoid catching Throwable whenever 
 possible, which will also help enforcing the developers to think about 
 possible checked exceptions in the calling trace.
 
 Gwen Shapira wrote:
 I know :(
 I'm not sure if going over and converting everything is worth the effort. 
 Although it can be a nice newbie jira.
 
 Jiangjie Qin wrote:
 Maybe we can simply change that to catch Throwables except 
 ControlThrowables? That might be a simple search and replace.

possible. definitely not in this JIRA though :)


 On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 400
  https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400
 
  So in case of unexpected exception, we log an error and keep running?
  
  Isn't it better to kill the processor, since we don't know what's the 
  state of the system? If the acceptor keeps placing messages in the queue 
  for a dead processor, isn't it a separate issue?
 
 Jiangjie Qin wrote:
 This part I'm not quite sure. I am not very experienced in the error 
 handling in such case, so please correct me if I missed something.
 Here is what I thought.
 
 The way it currently works is that the acceptor will 
 1. accept new connection request and create new socket channel
 2. choose a processor and put the socket channel into the processor's new 
 connection queue
 
 The processor will just take the socket channels from the queue and 
 register it to the selector.
 If the processor runs and get an uncaught exception, there are several 
 possibilities. 
 Case 1: The exception was from one socket channel. 
 Case 2: The exception was associated with a bad request. 
 In case 1, ideally we should just disconnect that socket channel without 
 affecting other socket channels.
 In case 2, I think we should log the error and skip the message - 
 assuming client will retry sending data if no response was received for a 
 given peoriod of time.
 
 I am not sure if letting processor exit is a good idea because this will 
 lead to the result of a badly behaving client screw the entire cluster - it 
 might screw processors one by one. Comparing with that, I kind of leaning 
 towards keeping the processor running and serving other normal TCP 
 connections if possible, but log the error so monitoring system can detect 
 and see if human intervention is needed.
 
 Also, I don't know what to do here to prevent the thread from exiting 
 without catching all the throwables.
 According to this blog 
 http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
 I guess I can rethrow all the ControlThrowables, but intercept the rests?
 
 Guozhang Wang wrote:
 I would also prefer not to close the Processor thread upon exceptions, 
 mainly for avoid one bad client killing a shared Kafka cluster. In the past 
 we have seen such issues like DDoS MetadataRequest killing the cluster and 
 all other clients gets affected, etc, and the quota work is towards 
 preventing it. Since Processor threads are shared (8 by default on a broker), 
 it should not be closed by a single socket / bad client request.
 
 Gwen Shapira wrote:
 I like your thinking around cases #1 and #2. I think this should go as a 
 code comment somewhere, so when people improve / extend SocketServer they 
 will keep this logic in mind. Maybe even specify in specific catch clauses if 
 they are handling possible errors in request level or channel level.
 
 My concern is with possible case #3: Each processor has an 
 o.a.k.common.network.Selector. I'm concerned about the possibility of 
 something going wrong in the state of the selector, which will possibly be an 
 issue for all channels. For example failure to register could be an issue 
 with the channel.register call, but also perhaps an issue with keys.put (just 
 an example - I'm not sure something can actually break keys table). 
 
 I'd like to be able to identify cases where the Selector state may have 
 gone wrong and close the processor in that case. Does that make any sense? Or 
 am I being too paranoid?
 
 Jiangjie Qin wrote:
 Hi Gwen, I think what you said makes sense. Maybe I see this more from a 
 failure boundary point of 

Re: Round Robin Among Consumers

2015-07-22 Thread Aditya Auradkar
Hi,

Why not simply have as many partitions as the set of consumers you want to
round robin across?

Aditya

On Wed, Jul 22, 2015 at 2:37 PM, Ashish Singh asi...@cloudera.com wrote:

 Hey, don't you think that would be against the basic ordering guarantees
 Kafka provides?

 On Wed, Jul 22, 2015 at 2:14 PM, J A mbatth...@gmail.com wrote:

  Hi, This is reference to stackoverflow question 
 
 
 http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers
  
  Since Kafka 0.8 already maintains a client offset, i would like to
 request
  a feature, where a single partition consumption can be round robin
 across a
  set of consumers. The message delivery strategy should be an option
 chosen
  by the consumer.
 



 --

 Regards,
 Ashish



Re: Round Robin Among Consumers

2015-07-22 Thread Ashish Singh
Hey, don't you think that would be against the basic ordering guarantees
Kafka provides?

On Wed, Jul 22, 2015 at 2:14 PM, J A mbatth...@gmail.com wrote:

 Hi, This is reference to stackoverflow question 

 http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers
 
 Since Kafka 0.8 already maintains a client offset, i would like to request
 a feature, where a single partition consumption can be round robin across a
 set of consumers. The message delivery strategy should be an option chosen
 by the consumer.




-- 

Regards,
Ashish


[jira] [Commented] (KAFKA-2004) Write Kafka messages directly to HDFS

2015-07-22 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637745#comment-14637745
 ] 

Ashish K Singh commented on KAFKA-2004:
---

Spark streaming supports reading from Kafka. Once you have read from Kafka, it 
is upto you what you want to do with it. Here is an example, 
https://github.com/SinghAsDev/pankh.

 Write Kafka messages directly to HDFS
 -

 Key: KAFKA-2004
 URL: https://issues.apache.org/jira/browse/KAFKA-2004
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core, producer 
Affects Versions: 0.8.1.1
Reporter: sutanu das
Assignee: Neha Narkhede
Priority: Critical

 1. Is there a way to write Kafka messages directly to HDFS without writing 
 any consumer code? 
 2. Is there anyway to integrate Kafka with Storm or Spark so messages goes 
 directly from Kafka consumers to HDFS sync?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2275) Add a ListTopics() API to the new consumer

2015-07-22 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-2275:
--
Attachment: KAFKA-2275_2015-07-22_16:09:34.patch

 Add a ListTopics() API to the new consumer
 --

 Key: KAFKA-2275
 URL: https://issues.apache.org/jira/browse/KAFKA-2275
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Ashish K Singh
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-2275.patch, KAFKA-2275.patch, 
 KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch, 
 KAFKA-2275_2015-07-22_16:09:34.patch


 With regex subscription like
 {code}
 consumer.subscribe(topic*)
 {code}
 The partition assignment is automatically done at the Kafka side, while there 
 are some use cases where consumers want regex subscriptions but not 
 Kafka-side partition assignment, rather with their own specific partition 
 assignment. With ListTopics() they can periodically check for topic list 
 changes and specifically subscribe to the partitions of the new topics.
 For implementation, it involves sending a TopicMetadataRequest to a random 
 broker and parse the response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637890#comment-14637890
 ] 

Guozhang Wang commented on KAFKA-2350:
--

[~becket_qin] OK I understand now. The subscribe(partition) will check if 
subscribe(topic) has been called before, hence assignment() is not null. I 
think behavior is a bit confusing since it depends on the call history of 
subscribe..

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2275) Add a ListTopics() API to the new consumer

2015-07-22 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637889#comment-14637889
 ] 

Ashish K Singh commented on KAFKA-2275:
---

Updated reviewboard https://reviews.apache.org/r/36681/
 against branch trunk

 Add a ListTopics() API to the new consumer
 --

 Key: KAFKA-2275
 URL: https://issues.apache.org/jira/browse/KAFKA-2275
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Ashish K Singh
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-2275.patch, KAFKA-2275.patch, 
 KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch, 
 KAFKA-2275_2015-07-22_16:09:34.patch


 With regex subscription like
 {code}
 consumer.subscribe(topic*)
 {code}
 The partition assignment is automatically done at the Kafka side, while there 
 are some use cases where consumers want regex subscriptions but not 
 Kafka-side partition assignment, rather with their own specific partition 
 assignment. With ListTopics() they can periodically check for topic list 
 changes and specifically subscribe to the partitions of the new topics.
 For implementation, it involves sending a TopicMetadataRequest to a random 
 broker and parse the response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/
---

(Updated July 22, 2015, 11:09 p.m.)


Review request for kafka.


Bugs: KAFKA-2275
https://issues.apache.org/jira/browse/KAFKA-2275


Repository: kafka


Description (updated)
---

Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for 
Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all 
topics. Remove braces for single line if


KAFKA-2275: Add a ListTopics() API to the new consumer


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
48fe7961e2215372d8033ece4af739ea06c6457b 
  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
252b759c0801f392e3526b0f31503b4b8fbf1c8a 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
bea3d737c51be77d5b5293cdd944d33b905422ba 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
c14eed1e95f2e682a235159a366046f00d1d90d6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
d595c1cb07909e21946532501f648be3033603d9 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 

Diff: https://reviews.apache.org/r/36681/diff/


Testing
---


Thanks,

Ashish Singh



[jira] [Updated] (KAFKA-2353) SocketServer.Processor should catch exception and close the socket properly in configureNewConnections.

2015-07-22 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2353:

Attachment: KAFKA-2353_2015-07-22_17:51:42.patch

 SocketServer.Processor should catch exception and close the socket properly 
 in configureNewConnections.
 ---

 Key: KAFKA-2353
 URL: https://issues.apache.org/jira/browse/KAFKA-2353
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-2353.patch, KAFKA-2353_2015-07-21_22:02:24.patch, 
 KAFKA-2353_2015-07-22_17:51:42.patch


 We see an increasing number of sockets in CLOSE_WAIT status in our production 
 environment in recent couple of days. From the thread dump it seems one of 
 the Processor thread has died but the acceptor was still putting many new 
 connections its new connection queue.
 The cause of dead Processor thread was due to we are not catching all the 
 exceptions in the Processor thread. For example, in our case it seems to be 
 an exception thrown in configureNewConnections().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Ashish Singh


 On July 22, 2015, 4:43 p.m., Jason Gustafson wrote:
  Hey Ashish, this looks pretty good to me. Just some minor comments.

Thanks for the review! Addressed your concerns.


 On July 22, 2015, 4:43 p.m., Jason Gustafson wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   line 298
  https://reviews.apache.org/r/36681/diff/1/?file=1018418#file1018418line298
 
  Any reason not to put this method in Fetcher instead of here? I don't 
  have a strong feeling, but it was kind of nice keeping 
  ConsumerNetworkClient largely free of application logic.
  
  Also, it might be nice to have a unit test.

Moved and added unit test for Fetcher.getAllTopics.


 On July 22, 2015, 4:43 p.m., Jason Gustafson wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   lines 314-315
  https://reviews.apache.org/r/36681/diff/1/?file=1018418#file1018418line314
 
  I think I asked this before, but is there any harm in returning this 
  topic to the user? I ask because we don't actually prevent them from 
  calling partitionsFor with the same topic.

Removed the exclusion.


- Ashish


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92604
---


On July 22, 2015, 11:09 p.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36681/
 ---
 
 (Updated July 22, 2015, 11:09 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2275
 https://issues.apache.org/jira/browse/KAFKA-2275
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for 
 Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all 
 topics. Remove braces for single line if
 
 
 KAFKA-2275: Add a ListTopics() API to the new consumer
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
 ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 c14eed1e95f2e682a235159a366046f00d1d90d6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  d595c1cb07909e21946532501f648be3033603d9 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea 
   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
 
 Diff: https://reviews.apache.org/r/36681/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ashish Singh
 




Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Edward Ribeiro

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/
---

(Updated July 23, 2015, 12:38 a.m.)


Review request for kafka.


Bugs: KAFKA-2355
https://issues.apache.org/jira/browse/KAFKA-2355


Repository: kafka


Description
---

KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
deleted


Diffs (updated)
-

  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 

Diff: https://reviews.apache.org/r/36670/diff/


Testing
---


Thanks,

Edward Ribeiro



[jira] [Updated] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edward Ribeiro updated KAFKA-2355:
--
Attachment: KAFKA-2355_2015-07-22_21:37:51.patch

 Add an unit test to validate the deletion of a partition marked as deleted
 --

 Key: KAFKA-2355
 URL: https://issues.apache.org/jira/browse/KAFKA-2355
 Project: Kafka
  Issue Type: Test
Affects Versions: 0.8.2.1
Reporter: Edward Ribeiro
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch


 Trying to delete a partition marked as deleted throws 
 {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
 validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637984#comment-14637984
 ] 

Edward Ribeiro commented on KAFKA-2355:
---

Updated reviewboard https://reviews.apache.org/r/36670/diff/
 against branch origin/trunk

 Add an unit test to validate the deletion of a partition marked as deleted
 --

 Key: KAFKA-2355
 URL: https://issues.apache.org/jira/browse/KAFKA-2355
 Project: Kafka
  Issue Type: Test
Affects Versions: 0.8.2.1
Reporter: Edward Ribeiro
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch


 Trying to delete a partition marked as deleted throws 
 {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
 validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1893) Allow regex subscriptions in the new consumer

2015-07-22 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637894#comment-14637894
 ] 

Ashish K Singh commented on KAFKA-1893:
---

[~hachikuji] I think after working on KAFKA-2275, I got even more interested in 
working on this. In case you are not working on this yet and you think it is OK 
for me to take a crack on this, I would like to work on this.

 Allow regex subscriptions in the new consumer
 -

 Key: KAFKA-1893
 URL: https://issues.apache.org/jira/browse/KAFKA-1893
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Jay Kreps
Assignee: Jason Gustafson
Priority: Critical
 Fix For: 0.8.3


 The consumer needs to handle subscribing to regular expressions. Presumably 
 this would be done as a new api,
 {code}
   void subscribe(java.util.regex.Pattern pattern);
 {code}
 Some questions/thoughts to work out:
  - It should not be possible to mix pattern subscription with partition 
 subscription.
  - Is it allowable to mix this with normal topic subscriptions? Logically 
 this is okay but a bit complex to implement.
  - We need to ensure we regularly update the metadata and recheck our regexes 
 against the metadata to update subscriptions for new topics that are created 
 or old topics that are deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-07-22 Thread Stevo Slavić
Jiangjie,

Seems I was misunderstood; KafakConsumer.poll after subscribing to topic
via KafkaConsumer.subscribe(String... topics) does not work, Unknown api
code 11 error, even with both client and broker being latest 0.8.3/trunk.
Will try to create a failing test and open bug report.

Kind regards,
Stevo Slavic.

On Wed, Jul 22, 2015 at 8:36 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I don't think we have consumer coordinator in 0.8.2 brokers. So
 KafkaConsumer in 0.8.3 will only be able to subscribe to partitions
 explicitly. Subscribing to a topic won't work with 0.8.2 brokers.

 Jiangjie (Becket) Qin

 On Wed, Jul 22, 2015 at 4:26 AM, Stevo Slavić ssla...@gmail.com wrote:

  I'm getting Unknown api code 11 even when both client and server are
  0.8.3/trunk, when KafkaConsumer.subscribe(String... topics) is used.
 
  Bug?
 
  Kind regards,
  Stevo Slavic.
 
  On Fri, Apr 24, 2015 at 7:13 PM, Neha Narkhede n...@confluent.io
 wrote:
 
   Yes, I was clearly confused :-)
  
   On Fri, Apr 24, 2015 at 9:37 AM, Sean Lydon lydon.s...@gmail.com
  wrote:
  
Thanks for the responses. Ewen is correct that I am referring to the
*new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer).
   
I am extending the consumer to allow my applications more control
 over
committed offsets.  I really want to get away from zookeeper (so
 using
the offset storage), and re-balancing is something I haven't really
needed to tackle in an automated/seamless way.  Either way, I'll hold
off going further down this road until there is more interest.
   
@Gwen
I set up a single consumer without partition.assignment.strategy or
rebalance.callback.class.  I was unable to subscribe to just a topic
(Unknown api code 11 on broker), but I could subscribe to a
topicpartition.  This makes sense as I would need to handle
 re-balance
outside the consumer.  Things functioned as expected (well  I have an
additional minor fix to code from KAFKA-2121), and the only
 exceptions
on broker were due to closing consumers (which I have become
accustomed to).  My tests are specific to my extended version of the
consumer, but they basically do a little writing and reading with
different serde classes with application controlled commits (similar
to onSuccess and onFailure after each record, but with tolerance for
out of order acknowledgements).
   
If you are interested, here is the patch of the hack against trunk.
   
On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava
e...@confluent.io wrote:
 @Neha I think you're mixing up the 0.8.1/0.8.2 updates and the
0.8.2/0.8.3
 that's being discussed here?

 I think the original question was about using the *new* consumer
(clients
 consumer) with 0.8.2. Gwen's right, it will use features not even
 implemented in the broker in trunk yet, let alone the 0.8.2.

 I don't think the enable.commit.downgrade type option, or
  supporting
the
 old protocol with the new consumer at all, makes much sense. You'd
  end
   up
 with some weird hybrid of simple and high-level consumers -- you
  could
use
 offset storage, but you'd have to manage rebalancing yourself since
   none
of
 the coordinator support would be there.


 On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede n...@confluent.io
wrote:

 My understanding is that ideally the 0.8.3 consumer should work
 with
   an
 0.8.2 broker if the offset commit config was set to zookeeper.

 The only thing that might not work is offset commit to Kafka,
 which
makes
 sense since the 0.8.2 broker does not support Kafka based offset
 management.

 If we broke all kinds of offset commits, then it seems like a
regression,
 no?

 On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira 
  gshap...@cloudera.com
 wrote:

  I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
  broker... there are some essential pieces that are missing in
  0.8.2
  (Coordinator, Heartbeat, etc).
  Maybe I'm missing something. It will be nice if this will work
 :)
 
  Mind sharing what / how you tested? Were there no errors in
 broker
  logs after your fix?
 
  On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon 
 lydon.s...@gmail.com
  
 wrote:
   Currently the clients consumer (trunk) sends offset commit
   requests
of
   version 2.  The 0.8.2 brokers fail to handle this particular
   request
   with a:
  
   java.lang.AssertionError: assertion failed: Version 2 is
 invalid
   for
   OffsetCommitRequest. Valid versions are 0 or 1.
  
   I was able to make this work via a forceful downgrade of this
   particular request, but I would like some feedback on whether
 a
   enable.commit.downgrade configuration would be a tolerable
   method
to
   allow 0.8.3 consumers to interact 

Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Edward Ribeiro

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/#review92688
---



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 267)
https://reviews.apache.org/r/36670/#comment146915

I am ambivalent about catching '_', so I decided to let the error propagate.



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 269)
https://reviews.apache.org/r/36670/#comment146916

I disagree because we are issuing two `deleteTopic` operations, the first 
one should succeed and the second one should fail with an specific exception, 
but in spite of this the topic should have been deleted by the first call, even 
if the fail is called. But I am okay with removing the finally block and have 
it after the catch.


- Edward Ribeiro


On July 22, 2015, 1:46 a.m., Edward Ribeiro wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36670/
 ---
 
 (Updated July 22, 2015, 1:46 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2355
 https://issues.apache.org/jira/browse/KAFKA-2355
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
 deleted
 
 
 Diffs
 -
 
   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
 fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 
 
 Diff: https://reviews.apache.org/r/36670/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Edward Ribeiro
 




[jira] [Commented] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637996#comment-14637996
 ] 

Edward Ribeiro commented on KAFKA-2355:
---

Hi [~ijuma] and [~granthenke], I have updated the patch as requested in the 
latest review. Let me know if something is missing or I misinterpreted, please. 
:)

ps: could one of you assign this little ticket to me? I asked for inclusion 
into colaborators' list so that I could assign issues to myself yesterday I 
guess, but it was not granted yet. :-P Cheers!

 Add an unit test to validate the deletion of a partition marked as deleted
 --

 Key: KAFKA-2355
 URL: https://issues.apache.org/jira/browse/KAFKA-2355
 Project: Kafka
  Issue Type: Test
Affects Versions: 0.8.2.1
Reporter: Edward Ribeiro
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch


 Trying to delete a partition marked as deleted throws 
 {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
 validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/
---

(Updated July 23, 2015, 12:51 a.m.)


Review request for kafka.


Bugs: KAFKA-2353
https://issues.apache.org/jira/browse/KAFKA-2353


Repository: kafka


Description (updated)
---

Addressed Gwen's comments


Addressed Gwen's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/network/SocketServer.scala 
91319fa010b140cca632e5fa8050509bd2295fc9 

Diff: https://reviews.apache.org/r/36664/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-2353) SocketServer.Processor should catch exception and close the socket properly in configureNewConnections.

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638008#comment-14638008
 ] 

Jiangjie Qin commented on KAFKA-2353:
-

Updated reviewboard https://reviews.apache.org/r/36664/diff/
 against branch origin/trunk

 SocketServer.Processor should catch exception and close the socket properly 
 in configureNewConnections.
 ---

 Key: KAFKA-2353
 URL: https://issues.apache.org/jira/browse/KAFKA-2353
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-2353.patch, KAFKA-2353_2015-07-21_22:02:24.patch, 
 KAFKA-2353_2015-07-22_17:51:42.patch


 We see an increasing number of sockets in CLOSE_WAIT status in our production 
 environment in recent couple of days. From the thread dump it seems one of 
 the Processor thread has died but the acceptor was still putting many new 
 connections its new connection queue.
 The cause of dead Processor thread was due to we are not catching all the 
 exceptions in the Processor thread. For example, in our case it seems to be 
 an exception thrown in configureNewConnections().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92314
---


Thanks for the patch. Looks good overall. A few comments below.

There are a few places where we still wrap single line statement with {}.


build.gradle (lines 247 - 255)
https://reviews.apache.org/r/33620/#comment146726

It seems that you reverted the changes in kafka-2323.



checkstyle/import-control.xml (line 56)
https://reviews.apache.org/r/33620/#comment146733

Is this needed? It doesn't seem that network needs to use any protocol.



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 47)
https://reviews.apache.org/r/33620/#comment146402

Could we add @Override annotation?



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 96)
https://reviews.apache.org/r/33620/#comment146404

Could we add @Override annotation? There are a few other methods in this 
class are like that.



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 150)
https://reviews.apache.org/r/33620/#comment146408

src = srcs



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 183)
https://reviews.apache.org/r/33620/#comment146409

Typo Rerturns



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 191)
https://reviews.apache.org/r/33620/#comment146410

param is not SelectionKey.



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 200)
https://reviews.apache.org/r/33620/#comment146411

param is not SelectionKey.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 87)
https://reviews.apache.org/r/33620/#comment146414

Could we add the @Override annotation to this and a few other methods in 
this class?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 231 - 234)
https://reviews.apache.org/r/33620/#comment146446

Still some questions on this.

1. When handshakeStatus is not NEED_UNWRAP (e.g. FINISHED), we could have 
flushed all the bytes. In this case, we should turn off the write interestOps 
in the socket key, right?
2. When handshakeStatus is NEED_UNWRAP and write is true, we will move on 
to the NEED_UNWRAP case. However, in this case, there may still be unflushed 
bytes in netWriteBuffer.
3. When handshakeStatus transitions to FINISHED, we return to the callers. 
Doesn't that delay the completion of the handshake since this key may no longer 
be selected?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 245 - 246)
https://reviews.apache.org/r/33620/#comment146594

Since we are not making use of appReadBuffer during handshake, could 
OVERFLOW ever happen? If not, perhaps we can add a comment.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 361 - 374)
https://reviews.apache.org/r/33620/#comment146593

Hmm, if status is ok and handshakeStatus is NEED_UNWRAP, could we get into 
infinite loop here?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 394 - 397)
https://reviews.apache.org/r/33620/#comment146694

We talked about that before. It's possible that we read all bytes for 
multiple recieves into the appReadBuffer. Selector only reads off one receive 
at a time. If there are no more bytes from incoming network, this key may not 
be selected to read off the next receive in the appReadBuffer. How do we 
resolve that issue?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 411)
https://reviews.apache.org/r/33620/#comment146696

Is renegotiation actually supported? It seems that renegotiation can happen 
in the middle of a regular send/receive.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 601 - 606)
https://reviews.apache.org/r/33620/#comment146697

It seems that in general, we expect addInterestOps to be called only if 
handshake has completed. Perhaps we can just throw an IllegalStateException if 
handshake hasn't completed? Ditto to removeInterestOps.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 218 
- 220)
https://reviews.apache.org/r/33620/#comment146390

Coding convention: no need to wrap single line statement with {}.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 274 
- 276)
https://reviews.apache.org/r/33620/#comment146391

Coding convention: no need to wrap single line statement with {}.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 434 
- 435)
https://reviews.apache.org/r/33620/#comment146388

We will probably need to 

[jira] [Updated] (KAFKA-2345) Attempt to delete a topic already marked for deletion throws ZkNodeExistsException

2015-07-22 Thread Stevo Slavic (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stevo Slavic updated KAFKA-2345:

Affects Version/s: 0.8.2.0

 Attempt to delete a topic already marked for deletion throws 
 ZkNodeExistsException
 --

 Key: KAFKA-2345
 URL: https://issues.apache.org/jira/browse/KAFKA-2345
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Fix For: 0.8.3

 Attachments: KAFKA-2345.patch, KAFKA-2345_2015-07-17_10:20:55.patch


 Throwing a TopicAlreadyMarkedForDeletionException will make much more sense. 
 A user does not necessarily have to know about involvement of zk in the 
 process.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)