Re: Review Request 33620: Patch for KAFKA-1690
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review92599 --- clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 46 - 57) https://reviews.apache.org/r/33620/#comment146806 Some of these fields can be `final`. - Ismael Juma On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 20, 2015, 7 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. Diffs - build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java PRE-CREATION
Re: Review Request 33620: Patch for KAFKA-1690
On July 22, 2015, 3 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java, lines 231-234 https://reviews.apache.org/r/33620/diff/12/?file=1017027#file1017027line231 Still some questions on this. 1. When handshakeStatus is not NEED_UNWRAP (e.g. FINISHED), we could have flushed all the bytes. In this case, we should turn off the write interestOps in the socket key, right? 2. When handshakeStatus is NEED_UNWRAP and write is true, we will move on to the NEED_UNWRAP case. However, in this case, there may still be unflushed bytes in netWriteBuffer. 3. When handshakeStatus transitions to FINISHED, we return to the callers. Doesn't that delay the completion of the handshake since this key may no longer be selected? 1. handshakeStatus cannot go from NEED_WRAP to FINISHED it can only go to NEED_UNWRAP. Either client or server needs to get into NEED_UNWRAP before it can reach to FINISHED status. 2. This is not true. If the handshake status need_unwrap and we are checking if flush returns true if its not than that means there are unflushed bytes netWriteBuffer. we set the interestOps to WRITE and break. 3. I am not sure I follow. If the handshake is finished the read bit is still on. So either client needs to start sending some data , at that point they set write bit on. Example metadata update request or server starts to respond to the client with responses. Handshake happens at the connection in reality client is done with handshake and starts sending some requests either its a producer or consumer. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review92314 --- On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 20, 2015, 7 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. Diffs - build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637126#comment-14637126 ] Jay Kreps commented on KAFKA-2350: -- [~becket_qin] Cool, we're on the same page, that was how I interpreted what you said. There is definitely a sense in which this is more elegant but there is a little complexity since you need to keep a list of surprised partitions and need to populate that when unsubscribe(partition) is called if that topic is subscribed to. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33620: Patch for KAFKA-1690
On July 22, 2015, 3 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java, lines 361-374 https://reviews.apache.org/r/33620/diff/12/?file=1017027#file1017027line361 Hmm, if status is ok and handshakeStatus is NEED_UNWRAP, could we get into infinite loop here? Sorry. I thought I mvoed the socketChannel read inside the loop. I missed it. On July 22, 2015, 3 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java, lines 394-397 https://reviews.apache.org/r/33620/diff/12/?file=1017027#file1017027line394 We talked about that before. It's possible that we read all bytes for multiple recieves into the appReadBuffer. Selector only reads off one receive at a time. If there are no more bytes from incoming network, this key may not be selected to read off the next receive in the appReadBuffer. How do we resolve that issue? yes. This is an issue which I am trying to address by adding the stagedReceives on Selector side which is to read as many availbale reads as possible in a single poll and push these into stagedReceives queue and pop one out during that poll add to completedReceives. In other polls if the selectionKey is not selected and its not mute and if there is a stagedReceive waiting we will poll that and add to completedReceives. But this logic is not working on serverside as I get selector.send IllegalStateException. I am still looking into it will update the patch once I've a solution. Please do let me know if the above description sounds right to you. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review92314 --- On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 20, 2015, 7 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. Diffs - build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7
Re: Review Request 36681: Patch for KAFKA-2275
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/#review92604 --- Hey Ashish, this looks pretty good to me. Just some minor comments. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 298) https://reviews.apache.org/r/36681/#comment146810 Any reason not to put this method in Fetcher instead of here? I don't have a strong feeling, but it was kind of nice keeping ConsumerNetworkClient largely free of application logic. Also, it might be nice to have a unit test. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (lines 314 - 315) https://reviews.apache.org/r/36681/#comment146811 I think I asked this before, but is there any harm in returning this topic to the user? I ask because we don't actually prevent them from calling partitionsFor with the same topic. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 320) https://reviews.apache.org/r/36681/#comment146812 I think convention is to leave off the braces on one-line if statements. - Jason Gustafson On July 22, 2015, 6:32 a.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/ --- (Updated July 22, 2015, 6:32 a.m.) Review request for kafka. Bugs: KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275 Repository: kafka Description --- KAFKA-2275: Add a ListTopics() API to the new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 252b759c0801f392e3526b0f31503b4b8fbf1c8a clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c14eed1e95f2e682a235159a366046f00d1d90d6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36681/diff/ Testing --- Thanks, Ashish Singh
Re: Review Request 33620: Patch for KAFKA-1690
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review92405 --- clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java (line 61) https://reviews.apache.org/r/33620/#comment146575 Is there a JIRA ticket for removing `BlockingChannel`? If so, it may be worth mentioning the id here so that it's easy to find when the time comes for the removal to be done. clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java (line 199) https://reviews.apache.org/r/33620/#comment146804 You can use try with resources now that we require Java 7. - Ismael Juma On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 20, 2015, 7 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. Diffs - build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java PRE-CREATION
Re: Review Request 33620: Patch for KAFKA-1690
On July 22, 2015, 3 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java, line 411 https://reviews.apache.org/r/33620/diff/12/?file=1017027#file1017027line411 Is renegotiation actually supported? It seems that renegotiation can happen in the middle of a regular send/receive. Yes it can happen during the middle of regular send/receive. But this need to be initiated by server which we don't have any criteria i.e we are checking any certificate expiration to trigger renegotiation. As we discussed we are going to skip this for the initial version. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review92314 --- On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 20, 2015, 7 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. Diffs - build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java PRE-CREATION
Re: Review Request 36664: Patch for KAFKA-2353
On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 400 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400 So in case of unexpected exception, we log an error and keep running? Isn't it better to kill the processor, since we don't know what's the state of the system? If the acceptor keeps placing messages in the queue for a dead processor, isn't it a separate issue? Jiangjie Qin wrote: This part I'm not quite sure. I am not very experienced in the error handling in such case, so please correct me if I missed something. Here is what I thought. The way it currently works is that the acceptor will 1. accept new connection request and create new socket channel 2. choose a processor and put the socket channel into the processor's new connection queue The processor will just take the socket channels from the queue and register it to the selector. If the processor runs and get an uncaught exception, there are several possibilities. Case 1: The exception was from one socket channel. Case 2: The exception was associated with a bad request. In case 1, ideally we should just disconnect that socket channel without affecting other socket channels. In case 2, I think we should log the error and skip the message - assuming client will retry sending data if no response was received for a given peoriod of time. I am not sure if letting processor exit is a good idea because this will lead to the result of a badly behaving client screw the entire cluster - it might screw processors one by one. Comparing with that, I kind of leaning towards keeping the processor running and serving other normal TCP connections if possible, but log the error so monitoring system can detect and see if human intervention is needed. Also, I don't know what to do here to prevent the thread from exiting without catching all the throwables. According to this blog http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/ I guess I can rethrow all the ControlThrowables, but intercept the rests? I would also prefer not to close the Processor thread upon exceptions, mainly for avoid one bad client killing a shared Kafka cluster. In the past we have seen such issues like DDoS MetadataRequest killing the cluster and all other clients gets affected, etc, and the quota work is towards preventing it. Since Processor threads are shared (8 by default on a broker), it should not be closed by a single socket / bad client request. On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 465 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465 Turns out that catching Throwable is a really bad idea: https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ Jiangjie Qin wrote: Ah... Didn't know that before. I explicitly listed the exceptions. Searching : Throwable gives me 180+ cases in code base :P Though many of them are from unit tests (which, arguably maybe OK) there are still a lot in the core package. I agree that we should avoid catching Throwable whenever possible, which will also help enforcing the developers to think about possible checked exceptions in the calling trace. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/#review92496 --- On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/ --- (Updated July 22, 2015, 5:02 a.m.) Review request for kafka. Bugs: KAFKA-2353 https://issues.apache.org/jira/browse/KAFKA-2353 Repository: kafka Description --- Addressed Gwen's comments Diffs - core/src/main/scala/kafka/network/SocketServer.scala 91319fa010b140cca632e5fa8050509bd2295fc9 Diff: https://reviews.apache.org/r/36664/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 36664: Patch for KAFKA-2353
On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 465 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465 Turns out that catching Throwable is a really bad idea: https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ Jiangjie Qin wrote: Ah... Didn't know that before. I explicitly listed the exceptions. Guozhang Wang wrote: Searching : Throwable gives me 180+ cases in code base :P Though many of them are from unit tests (which, arguably maybe OK) there are still a lot in the core package. I agree that we should avoid catching Throwable whenever possible, which will also help enforcing the developers to think about possible checked exceptions in the calling trace. I know :( I'm not sure if going over and converting everything is worth the effort. Although it can be a nice newbie jira. On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 400 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400 So in case of unexpected exception, we log an error and keep running? Isn't it better to kill the processor, since we don't know what's the state of the system? If the acceptor keeps placing messages in the queue for a dead processor, isn't it a separate issue? Jiangjie Qin wrote: This part I'm not quite sure. I am not very experienced in the error handling in such case, so please correct me if I missed something. Here is what I thought. The way it currently works is that the acceptor will 1. accept new connection request and create new socket channel 2. choose a processor and put the socket channel into the processor's new connection queue The processor will just take the socket channels from the queue and register it to the selector. If the processor runs and get an uncaught exception, there are several possibilities. Case 1: The exception was from one socket channel. Case 2: The exception was associated with a bad request. In case 1, ideally we should just disconnect that socket channel without affecting other socket channels. In case 2, I think we should log the error and skip the message - assuming client will retry sending data if no response was received for a given peoriod of time. I am not sure if letting processor exit is a good idea because this will lead to the result of a badly behaving client screw the entire cluster - it might screw processors one by one. Comparing with that, I kind of leaning towards keeping the processor running and serving other normal TCP connections if possible, but log the error so monitoring system can detect and see if human intervention is needed. Also, I don't know what to do here to prevent the thread from exiting without catching all the throwables. According to this blog http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/ I guess I can rethrow all the ControlThrowables, but intercept the rests? Guozhang Wang wrote: I would also prefer not to close the Processor thread upon exceptions, mainly for avoid one bad client killing a shared Kafka cluster. In the past we have seen such issues like DDoS MetadataRequest killing the cluster and all other clients gets affected, etc, and the quota work is towards preventing it. Since Processor threads are shared (8 by default on a broker), it should not be closed by a single socket / bad client request. I like your thinking around cases #1 and #2. I think this should go as a code comment somewhere, so when people improve / extend SocketServer they will keep this logic in mind. Maybe even specify in specific catch clauses if they are handling possible errors in request level or channel level. My concern is with possible case #3: Each processor has an o.a.k.common.network.Selector. I'm concerned about the possibility of something going wrong in the state of the selector, which will possibly be an issue for all channels. For example failure to register could be an issue with the channel.register call, but also perhaps an issue with keys.put (just an example - I'm not sure something can actually break keys table). I'd like to be able to identify cases where the Selector state may have gone wrong and close the processor in that case. Does that make any sense? Or am I being too paranoid? - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/#review92496 --- On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit:
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637219#comment-14637219 ] Yasuhiro Matsuda commented on KAFKA-2350: - I think overloading subscribe/unsubscribe is very confusing. Subscribe/unsubscribe and pause/unpause are two very different behaviors. And overloading same method names is not really simplifying the API. I want pause/unpause to be a pure flow control. It shouldn't be mix up with subscription. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637317#comment-14637317 ] Jay Kreps commented on KAFKA-2350: -- [~hachikuji] Yeah I agree that would help this case. The nice thing about that proposal is it would make the group management explicit which could be nice. I wonder if that might not add more things that can go wrong in the common case, though. i.e. right now the common case of just subscribing to a topic and letting the group management figure out the assignment and it is kind of hard to mess that up. All the cases where either you subscribe to individual partitions or you pause partitions are kind of niche uses so maybe it is less important to optimize for those cases? Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637187#comment-14637187 ] Jason Gustafson edited comment on KAFKA-2350 at 7/22/15 5:01 PM: - [~becket_qin] [~jkreps] Currently automatic assignment is inferred based on which subscribe methods are invoked (e.g. if you subscribe to a topic, then we assume you want automatic assignment). I wonder if it might help to make that instead an explicit configuration parameter? Then that might free us a little bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we don't have to guess whether the user is intending to actually subscribe to a partition or just to unpause it. was (Author: hachikuji): [~becket_qin] [~jkreps] Currently automatic assignment is inferred based on which subscribe methods are invoked (e.g. if you subscribe to a topic, then we assume you want automatic assignment). I wonder if it might help to make that instead an explicit configuration parameter? Then that might free us a little bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we don't have to guess whether the user is intending to actually subscribe to a partition or just to pause it. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36664: Patch for KAFKA-2353
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/#review92607 --- LGTM overall. Could you address Ismael's comments as well before check-in? - Guozhang Wang On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/ --- (Updated July 22, 2015, 5:02 a.m.) Review request for kafka. Bugs: KAFKA-2353 https://issues.apache.org/jira/browse/KAFKA-2353 Repository: kafka Description --- Addressed Gwen's comments Diffs - core/src/main/scala/kafka/network/SocketServer.scala 91319fa010b140cca632e5fa8050509bd2295fc9 Diff: https://reviews.apache.org/r/36664/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-2352) Possible memory leak in MirrorMaker and/or new Producer
[ https://issues.apache.org/jira/browse/KAFKA-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637208#comment-14637208 ] Jiangjie Qin commented on KAFKA-2352: - Can you verify whether the messages you passed to producer has been sent or not? In mirror maker the default configuration was set to avoid data loss. So it producer cannot send a message it will retry infinitely. In that case, you might exhaust the producer's buffer if you keep appending messages to producer. Possible memory leak in MirrorMaker and/or new Producer --- Key: KAFKA-2352 URL: https://issues.apache.org/jira/browse/KAFKA-2352 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Kostya Golikov Attachments: consumer.conf, output.log, producer.conf I've been playing around with Mirror Maker (version from trunk, dated July 7th) and got a few problems, most noticeable of which is that MirrorMaker exhausts it's memory pool, even though it's size set to relatively huge value of 132 MB, and individual messages are around 2 KB. Batch size is set to just 2 messages (full configs are attached). {code} [2015-07-21 15:19:52,915] FATAL [mirrormaker-thread-1] Mirror maker thread failure due to (kafka.tools.MirrorMaker$MirrorMakerThread) org.apache.kafka.clients.producer.BufferExhaustedException: You have exhausted the 134217728 bytes of memory you configured for the client and the client is configured to error rather than block when memory is exhausted. at org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:124) at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:172) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:388) at kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:380) at kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:311) at kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:311) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:311) {code} Am I doing something wrong? Any help in further diagnosing of this problem might be handy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637338#comment-14637338 ] Gwen Shapira commented on KAFKA-2350: - Agree with [~yasuhiro.matsuda]. The notion of subscribe vs assigned is a bit challenging to grasp as is ([~guozhang] demostrated that nicely), adding flow-control mechanics into it will be messy and difficult for users to get right. I think we all hope that in most cases users will subscribe and unsubscribe to entire topics and let the coordinator handle the details (thereby reducing use-errors, mailing list cries for help, etc). Adding APIs that will cause the opposite to happen is a step in the opposite direction. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-27 - Conditional Publish
Tangent: I think we should complete the move of Produce / Fetch RPC to the client libraries before we add more revisions to this protocol. On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I missed yesterday's KIP hangout. I'm currently working on another KIP for enriched metadata of messages. Guozhang has already created a wiki page before ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata). We plan to fill the relative offset to the offset field in the batch sent by producer to avoid broker side re-compression. The message offset would become batch base offset + relative offset. I guess maybe the expected offset in KIP-27 can be only set for base offset? Would that affect certain use cases? For Jun's comments, I am not sure I completely get it. I think the producer only sends one batch per partition in a request. So either that batch is appended or not. Why a batch would be partially committed? Thanks, Jiangjie (Becket) Qin On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote: That's a fair point. I've added some imagined job logic to the KIP, so we can make sure the proposal stays in sync with the usages we're discussing. (The logic is just a quick sketch for now -- I expect I'll need to elaborate it as we get into more detail, or to address other concerns...) On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote: For 1, yes, when there is a transient leader change, it's guaranteed that a prefix of the messages in a request will be committed. However, it seems that the client needs to know what subset of messages are committed in order to resume the sending. Then the question is how. As Flavio indicated, for the use cases that you listed, it would be useful to figure out the exact logic by using this feature. For example, in the partition K/V store example, when we fail over to a new writer to the commit log, the zombie writer can publish new messages to the log after the new writer takes over, but before it publishes any message. We probably need to outline how this case can be handled properly. Thanks, Jun On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote: Hi Jun, Thanks for the close reading! Responses inline. Thanks for the write-up. The single producer use case you mentioned makes sense. It would be useful to include that in the KIP wiki. Great -- I'll make sure that the wiki is clear about this. 1. What happens when the leader of the partition changes in the middle of a produce request? In this case, the producer client is not sure whether the request succeeds or not. If there is only a single message in the request, the producer can just resend the request. If it sees an OffsetMismatch error, it knows that the previous send actually succeeded and can proceed with the next write. This is nice since it not only allows the producer to proceed during transient failures in the broker, it also avoids duplicates during producer resend. One caveat is when there are multiple messages in the same partition in a produce request. The issue is that in our current replication protocol, it's possible for some, but not all messages in the request to be committed. This makes resend a bit harder to deal with since on receiving an OffsetMismatch error, it's not clear which messages have been committed. One possibility is to expect that compression is enabled, in which case multiple messages are compressed into a single message. I was thinking that another possibility is for the broker to return the current high watermark when sending an OffsetMismatch error. Based on this info, the producer can resend the subset of messages that have not been committed. However, this may not work in a compacted topic since there can be holes in the offset. This is a excellent question. It's my understanding that at least a *prefix* of messages will be committed (right?) -- which seems to be enough for many cases. I'll try and come up with a more concrete answer here. 2. Is this feature only intended to be used with ack = all? The client doesn't get the offset with ack = 0. With ack = 1, it's possible for a previously acked message to be lost during leader transition, which will make the client logic more complicated. It's true that acks = 0 doesn't seem to be particularly useful; in all the cases I've come across, the client eventually wants to know about the mismatch error. However, it seems like there are some cases where acks = 1 would be fine -- eg. in a bulk load of a fixed dataset, losing messages during a leader transition just means you need to rewind / restart the load, which is not especially catastrophic. For many other interesting cases, acks = all is probably preferable. 3. How does the producer client know
Re: [VOTE] Switch to GitHub pull requests for new contributions
+1 (non binding) Can we have a wiki for procedure and let people verify the steps? After that we can update the Apache project page. On Tue, Jul 21, 2015 at 5:38 PM, Edward Ribeiro edward.ribe...@gmail.com wrote: +1 (non binding) On Tue, Jul 21, 2015 at 7:36 PM, Jay Kreps j...@confluent.io wrote: +1 -Jay On Tue, Jul 21, 2015 at 4:28 AM, Ismael Juma ism...@juma.me.uk wrote: Hi all, I would like to start a vote on switching to GitHub pull requests for new contributions. To be precise, the vote is on whether we should: * Update the documentation to tell users to use pull requests instead of patches and Review Board (i.e. merge KAFKA-2321 and KAFKA-2349) * Use pull requests for new contributions In a previous discussion[1], everyone that participated was in favour. It's also worth reading the Contributing Code Changes wiki page[2] (if you haven't already) to understand the flow. A number of pull requests have been merged in the last few weeks to test this flow and I believe it's working well enough. As usual, there is always room for improvement and I expect is to tweak things as time goes on. The main downside of using GitHub pull requests is that we don't have write access to https://github.com/apache/kafka. That means that we rely on commit hooks to close integrated pull requests (the merge script takes care of formatting the message so that this happens) and the PR creator or Apache Infra to close pull requests that are not integrated. Regarding existing contributions, I think it's up to the contributor to decide whether they want to resubmit it as a pull request or not. I expect that there will be a transition period where the old and new way will co-exist. But that can be discussed separately. The vote will run for 72 hours. +1 (non-binding) from me. Best, Ismael [1] http://search-hadoop.com/m/uyzND1N6CDH1DUc82 [2] https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637179#comment-14637179 ] Guozhang Wang commented on KAFKA-2350: -- Currently there is already a function for retrieving the subscribed topic partitions today: {code} public SetTopicPartition subscriptions() { acquire(); try { return Collections.unmodifiableSet(this.subscriptions.assignedPartitions()); } finally { release(); } } {code} which will for example remove the partition and hence change the returned values if consumer.unsubscribe(partition) is called. I actually think [~becket_qin]'s approach will not cause much confusion regarding the APIs. More explicitly assuming we add another function assignment() that returns you the assigned partitions, the semantics of the other APIs will be: {code} consumer.subscribe(topic); // will not throw any exception, but will update the assignment as well as subscription in the next poll. consumer.unsubscribe(topic); // will throw an exception if the topic is not subscribed; otherwise will update the assignment and the subscription in the next poll. consumer.assignment(); // return the assigned partitions consumer.subscriptions(); // return the subscribed partitions, which is the same to the assigned partitions most of the time consumer.subscribe(partition1); // will throw an exception if partition is not in assignment(), saying it is not assigned to you consumer.unsubscribe(partition2); // will throw an exception if partition is not in subscriptions(), saying it is not subscribed by yourself {code} What I am more concerned about this approach is about the client implementation. Since it allows a client to be both using Kafka partition assignment and not during its life cycle, this could possibly make the client state more complicated to manage. For example: {code} consumer.subscribe(topic1); // using kafka for assignment, say we are assigned topic1-partition1 and topic1-partition2 consumer.poll(); consumer.subscribe(topic2-partition1); // subscribe to another partition explicitly without letting kafka coordinator to be aware of. consumer.unsubscribe(topic1-partition1); // now the subscription is topic1-partition2 and topic2-partition1, where the first is from Kafka assignment and the second is from explicit subscription. {code} Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-27 - Conditional Publish
I missed yesterday's KIP hangout. I'm currently working on another KIP for enriched metadata of messages. Guozhang has already created a wiki page before ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata). We plan to fill the relative offset to the offset field in the batch sent by producer to avoid broker side re-compression. The message offset would become batch base offset + relative offset. I guess maybe the expected offset in KIP-27 can be only set for base offset? Would that affect certain use cases? For Jun's comments, I am not sure I completely get it. I think the producer only sends one batch per partition in a request. So either that batch is appended or not. Why a batch would be partially committed? Thanks, Jiangjie (Becket) Qin On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote: That's a fair point. I've added some imagined job logic to the KIP, so we can make sure the proposal stays in sync with the usages we're discussing. (The logic is just a quick sketch for now -- I expect I'll need to elaborate it as we get into more detail, or to address other concerns...) On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote: For 1, yes, when there is a transient leader change, it's guaranteed that a prefix of the messages in a request will be committed. However, it seems that the client needs to know what subset of messages are committed in order to resume the sending. Then the question is how. As Flavio indicated, for the use cases that you listed, it would be useful to figure out the exact logic by using this feature. For example, in the partition K/V store example, when we fail over to a new writer to the commit log, the zombie writer can publish new messages to the log after the new writer takes over, but before it publishes any message. We probably need to outline how this case can be handled properly. Thanks, Jun On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote: Hi Jun, Thanks for the close reading! Responses inline. Thanks for the write-up. The single producer use case you mentioned makes sense. It would be useful to include that in the KIP wiki. Great -- I'll make sure that the wiki is clear about this. 1. What happens when the leader of the partition changes in the middle of a produce request? In this case, the producer client is not sure whether the request succeeds or not. If there is only a single message in the request, the producer can just resend the request. If it sees an OffsetMismatch error, it knows that the previous send actually succeeded and can proceed with the next write. This is nice since it not only allows the producer to proceed during transient failures in the broker, it also avoids duplicates during producer resend. One caveat is when there are multiple messages in the same partition in a produce request. The issue is that in our current replication protocol, it's possible for some, but not all messages in the request to be committed. This makes resend a bit harder to deal with since on receiving an OffsetMismatch error, it's not clear which messages have been committed. One possibility is to expect that compression is enabled, in which case multiple messages are compressed into a single message. I was thinking that another possibility is for the broker to return the current high watermark when sending an OffsetMismatch error. Based on this info, the producer can resend the subset of messages that have not been committed. However, this may not work in a compacted topic since there can be holes in the offset. This is a excellent question. It's my understanding that at least a *prefix* of messages will be committed (right?) -- which seems to be enough for many cases. I'll try and come up with a more concrete answer here. 2. Is this feature only intended to be used with ack = all? The client doesn't get the offset with ack = 0. With ack = 1, it's possible for a previously acked message to be lost during leader transition, which will make the client logic more complicated. It's true that acks = 0 doesn't seem to be particularly useful; in all the cases I've come across, the client eventually wants to know about the mismatch error. However, it seems like there are some cases where acks = 1 would be fine -- eg. in a bulk load of a fixed dataset, losing messages during a leader transition just means you need to rewind / restart the load, which is not especially catastrophic. For many other interesting cases, acks = all is probably preferable. 3. How does the producer client know the offset to send the first message? Do we need to expose an API in producer to get the current high watermark? You're right, it might be irritating to have to go through the consumer API just for this. There are some
Re: Review Request 36664: Patch for KAFKA-2353
Since I've been dealing with the fallout of this particular problem all week, I'll add a few thoughts... On Wed, Jul 22, 2015 at 10:51 AM, Gwen Shapira gshap...@cloudera.com wrote: On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 465 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465 Turns out that catching Throwable is a really bad idea: https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ Jiangjie Qin wrote: Ah... Didn't know that before. I explicitly listed the exceptions. Guozhang Wang wrote: Searching : Throwable gives me 180+ cases in code base :P Though many of them are from unit tests (which, arguably maybe OK) there are still a lot in the core package. I agree that we should avoid catching Throwable whenever possible, which will also help enforcing the developers to think about possible checked exceptions in the calling trace. I know :( I'm not sure if going over and converting everything is worth the effort. Although it can be a nice newbie jira. On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 400 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400 So in case of unexpected exception, we log an error and keep running? Isn't it better to kill the processor, since we don't know what's the state of the system? If the acceptor keeps placing messages in the queue for a dead processor, isn't it a separate issue? Jiangjie Qin wrote: This part I'm not quite sure. I am not very experienced in the error handling in such case, so please correct me if I missed something. Here is what I thought. The way it currently works is that the acceptor will 1. accept new connection request and create new socket channel 2. choose a processor and put the socket channel into the processor's new connection queue The processor will just take the socket channels from the queue and register it to the selector. If the processor runs and get an uncaught exception, there are several possibilities. Case 1: The exception was from one socket channel. Case 2: The exception was associated with a bad request. In case 1, ideally we should just disconnect that socket channel without affecting other socket channels. In case 2, I think we should log the error and skip the message - assuming client will retry sending data if no response was received for a given peoriod of time. I am not sure if letting processor exit is a good idea because this will lead to the result of a badly behaving client screw the entire cluster - it might screw processors one by one. Comparing with that, I kind of leaning towards keeping the processor running and serving other normal TCP connections if possible, but log the error so monitoring system can detect and see if human intervention is needed. Also, I don't know what to do here to prevent the thread from exiting without catching all the throwables. According to this blog http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/ I guess I can rethrow all the ControlThrowables, but intercept the rests? Guozhang Wang wrote: I would also prefer not to close the Processor thread upon exceptions, mainly for avoid one bad client killing a shared Kafka cluster. In the past we have seen such issues like DDoS MetadataRequest killing the cluster and all other clients gets affected, etc, and the quota work is towards preventing it. Since Processor threads are shared (8 by default on a broker), it should not be closed by a single socket / bad client request. I like your thinking around cases #1 and #2. I think this should go as a code comment somewhere, so when people improve / extend SocketServer they will keep this logic in mind. Maybe even specify in specific catch clauses if they are handling possible errors in request level or channel level. My concern is with possible case #3: Each processor has an o.a.k.common.network.Selector. I'm concerned about the possibility of something going wrong in the state of the selector, which will possibly be an issue for all channels. For example failure to register could be an issue with the channel.register call, but also perhaps an issue with keys.put (just an example - I'm not sure something can actually break keys table). I'd like to be able to identify cases where the Selector state may have gone wrong and close the processor in that case. Does that make any sense? Or am I being too paranoid? If there are error cases that are not associated with a specific connection or request, then I agree that we should handle that a little differently. What we need to keep in mind is that close the processor is actually terminate the
Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?
I don't think we have consumer coordinator in 0.8.2 brokers. So KafkaConsumer in 0.8.3 will only be able to subscribe to partitions explicitly. Subscribing to a topic won't work with 0.8.2 brokers. Jiangjie (Becket) Qin On Wed, Jul 22, 2015 at 4:26 AM, Stevo Slavić ssla...@gmail.com wrote: I'm getting Unknown api code 11 even when both client and server are 0.8.3/trunk, when KafkaConsumer.subscribe(String... topics) is used. Bug? Kind regards, Stevo Slavic. On Fri, Apr 24, 2015 at 7:13 PM, Neha Narkhede n...@confluent.io wrote: Yes, I was clearly confused :-) On Fri, Apr 24, 2015 at 9:37 AM, Sean Lydon lydon.s...@gmail.com wrote: Thanks for the responses. Ewen is correct that I am referring to the *new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer). I am extending the consumer to allow my applications more control over committed offsets. I really want to get away from zookeeper (so using the offset storage), and re-balancing is something I haven't really needed to tackle in an automated/seamless way. Either way, I'll hold off going further down this road until there is more interest. @Gwen I set up a single consumer without partition.assignment.strategy or rebalance.callback.class. I was unable to subscribe to just a topic (Unknown api code 11 on broker), but I could subscribe to a topicpartition. This makes sense as I would need to handle re-balance outside the consumer. Things functioned as expected (well I have an additional minor fix to code from KAFKA-2121), and the only exceptions on broker were due to closing consumers (which I have become accustomed to). My tests are specific to my extended version of the consumer, but they basically do a little writing and reading with different serde classes with application controlled commits (similar to onSuccess and onFailure after each record, but with tolerance for out of order acknowledgements). If you are interested, here is the patch of the hack against trunk. On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava e...@confluent.io wrote: @Neha I think you're mixing up the 0.8.1/0.8.2 updates and the 0.8.2/0.8.3 that's being discussed here? I think the original question was about using the *new* consumer (clients consumer) with 0.8.2. Gwen's right, it will use features not even implemented in the broker in trunk yet, let alone the 0.8.2. I don't think the enable.commit.downgrade type option, or supporting the old protocol with the new consumer at all, makes much sense. You'd end up with some weird hybrid of simple and high-level consumers -- you could use offset storage, but you'd have to manage rebalancing yourself since none of the coordinator support would be there. On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede n...@confluent.io wrote: My understanding is that ideally the 0.8.3 consumer should work with an 0.8.2 broker if the offset commit config was set to zookeeper. The only thing that might not work is offset commit to Kafka, which makes sense since the 0.8.2 broker does not support Kafka based offset management. If we broke all kinds of offset commits, then it seems like a regression, no? On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira gshap...@cloudera.com wrote: I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2 broker... there are some essential pieces that are missing in 0.8.2 (Coordinator, Heartbeat, etc). Maybe I'm missing something. It will be nice if this will work :) Mind sharing what / how you tested? Were there no errors in broker logs after your fix? On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon lydon.s...@gmail.com wrote: Currently the clients consumer (trunk) sends offset commit requests of version 2. The 0.8.2 brokers fail to handle this particular request with a: java.lang.AssertionError: assertion failed: Version 2 is invalid for OffsetCommitRequest. Valid versions are 0 or 1. I was able to make this work via a forceful downgrade of this particular request, but I would like some feedback on whether a enable.commit.downgrade configuration would be a tolerable method to allow 0.8.3 consumers to interact with 0.8.2 brokers. I'm also interested in this even being a goal worth pursuing. Thanks, Sean -- Thanks, Neha -- Thanks, Ewen -- Thanks, Neha
[jira] [Commented] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position
[ https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637525#comment-14637525 ] ASF GitHub Bot commented on KAFKA-2342: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/88 KafkaConsumer rebalance with in-flight fetch can cause invalid position --- Key: KAFKA-2342 URL: https://issues.apache.org/jira/browse/KAFKA-2342 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Jason Gustafson Fix For: 0.9.0 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end up updating the fetch position of a partition to an offset which is no longer valid. The consequence is that we may end up either returning to the user messages with an unexpected position or we may fail to give back the right offset in position(). Additionally, this bug causes transient test failures in ConsumerBounceTest.testConsumptionWithBrokerFailures with the following exception: kafka.api.ConsumerBounceTest testConsumptionWithBrokerFailures FAILED java.lang.NullPointerException at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949) at kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86) at kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position
[ https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-2342. -- Resolution: Fixed Fix Version/s: 0.9.0 Issue resolved by pull request 88 [https://github.com/apache/kafka/pull/88] KafkaConsumer rebalance with in-flight fetch can cause invalid position --- Key: KAFKA-2342 URL: https://issues.apache.org/jira/browse/KAFKA-2342 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Jason Gustafson Fix For: 0.9.0 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end up updating the fetch position of a partition to an offset which is no longer valid. The consequence is that we may end up either returning to the user messages with an unexpected position or we may fail to give back the right offset in position(). Additionally, this bug causes transient test failures in ConsumerBounceTest.testConsumptionWithBrokerFailures with the following exception: kafka.api.ConsumerBounceTest testConsumptionWithBrokerFailures FAILED java.lang.NullPointerException at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949) at kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86) at kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position
[ https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637533#comment-14637533 ] Stevo Slavic commented on KAFKA-2342: - [~guozhang], fix version 0.9.0 or 0.8.3? KafkaConsumer rebalance with in-flight fetch can cause invalid position --- Key: KAFKA-2342 URL: https://issues.apache.org/jira/browse/KAFKA-2342 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Jason Gustafson Fix For: 0.9.0 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end up updating the fetch position of a partition to an offset which is no longer valid. The consequence is that we may end up either returning to the user messages with an unexpected position or we may fail to give back the right offset in position(). Additionally, this bug causes transient test failures in ConsumerBounceTest.testConsumptionWithBrokerFailures with the following exception: kafka.api.ConsumerBounceTest testConsumptionWithBrokerFailures FAILED java.lang.NullPointerException at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949) at kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86) at kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position
[ https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2342: - Fix Version/s: (was: 0.9.0) 0.8.3 KafkaConsumer rebalance with in-flight fetch can cause invalid position --- Key: KAFKA-2342 URL: https://issues.apache.org/jira/browse/KAFKA-2342 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Jason Gustafson Fix For: 0.8.3 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end up updating the fetch position of a partition to an offset which is no longer valid. The consequence is that we may end up either returning to the user messages with an unexpected position or we may fail to give back the right offset in position(). Additionally, this bug causes transient test failures in ConsumerBounceTest.testConsumptionWithBrokerFailures with the following exception: kafka.api.ConsumerBounceTest testConsumptionWithBrokerFailures FAILED java.lang.NullPointerException at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949) at kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86) at kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] Switch to GitHub pull requests for new contributions
Hi, On 22 Jul 2015 19:32, Jiangjie Qin j...@linkedin.com.invalid wrote: +1 (non binding) Can we have a wiki for procedure and let people verify the steps? After that we can update the Apache project page. Yes, I linked to the page in the original message: https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes Committers and contributors have provided feedback in previous discussions in the mailing list. Further improvements are welcome, of course. Also see KAFKA-2321 and KAFKA-2349 for updates to the website and GitHub's CONTRIBUTING.md. Ismael
Re: Review Request 36664: Patch for KAFKA-2353
On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 465 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465 Turns out that catching Throwable is a really bad idea: https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ Jiangjie Qin wrote: Ah... Didn't know that before. I explicitly listed the exceptions. Guozhang Wang wrote: Searching : Throwable gives me 180+ cases in code base :P Though many of them are from unit tests (which, arguably maybe OK) there are still a lot in the core package. I agree that we should avoid catching Throwable whenever possible, which will also help enforcing the developers to think about possible checked exceptions in the calling trace. Gwen Shapira wrote: I know :( I'm not sure if going over and converting everything is worth the effort. Although it can be a nice newbie jira. Maybe we can simply change that to catch Throwables except ControlThrowables? That might be a simple search and replace. On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 400 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400 So in case of unexpected exception, we log an error and keep running? Isn't it better to kill the processor, since we don't know what's the state of the system? If the acceptor keeps placing messages in the queue for a dead processor, isn't it a separate issue? Jiangjie Qin wrote: This part I'm not quite sure. I am not very experienced in the error handling in such case, so please correct me if I missed something. Here is what I thought. The way it currently works is that the acceptor will 1. accept new connection request and create new socket channel 2. choose a processor and put the socket channel into the processor's new connection queue The processor will just take the socket channels from the queue and register it to the selector. If the processor runs and get an uncaught exception, there are several possibilities. Case 1: The exception was from one socket channel. Case 2: The exception was associated with a bad request. In case 1, ideally we should just disconnect that socket channel without affecting other socket channels. In case 2, I think we should log the error and skip the message - assuming client will retry sending data if no response was received for a given peoriod of time. I am not sure if letting processor exit is a good idea because this will lead to the result of a badly behaving client screw the entire cluster - it might screw processors one by one. Comparing with that, I kind of leaning towards keeping the processor running and serving other normal TCP connections if possible, but log the error so monitoring system can detect and see if human intervention is needed. Also, I don't know what to do here to prevent the thread from exiting without catching all the throwables. According to this blog http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/ I guess I can rethrow all the ControlThrowables, but intercept the rests? Guozhang Wang wrote: I would also prefer not to close the Processor thread upon exceptions, mainly for avoid one bad client killing a shared Kafka cluster. In the past we have seen such issues like DDoS MetadataRequest killing the cluster and all other clients gets affected, etc, and the quota work is towards preventing it. Since Processor threads are shared (8 by default on a broker), it should not be closed by a single socket / bad client request. Gwen Shapira wrote: I like your thinking around cases #1 and #2. I think this should go as a code comment somewhere, so when people improve / extend SocketServer they will keep this logic in mind. Maybe even specify in specific catch clauses if they are handling possible errors in request level or channel level. My concern is with possible case #3: Each processor has an o.a.k.common.network.Selector. I'm concerned about the possibility of something going wrong in the state of the selector, which will possibly be an issue for all channels. For example failure to register could be an issue with the channel.register call, but also perhaps an issue with keys.put (just an example - I'm not sure something can actually break keys table). I'd like to be able to identify cases where the Selector state may have gone wrong and close the processor in that case. Does that make any sense? Or am I being too paranoid? Hi Gwen, I think what you said makes sense. Maybe I see this more from a failure boundary point of view. Actually we might need to do more if we let a processor exit. We need to stop the acceptor from
Zookeeper use cases with Kafka
Hello Folks, I wish to contribute to Kafka internals. And, one of the things which can help me do that is understanding how kafka uses zookeeper. I have some of these basic doubts:- 1. Is zookeeper primarily used for locking ? If yes, in what cases and what kind of nodes does it use - sequential/ephemeral? 2. Does kafka use zookeeper watches for any of functions ? 3. What kind of state is stored in Zookeeper ? (I believe it has to be the leader information per partition, but is there anything apart from it?) What is the scale of data that is stored in Zookeeper ? Looking forward for your help. Thanks, prabcs
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637187#comment-14637187 ] Jason Gustafson commented on KAFKA-2350: [~becket_qin] [~jkreps] Currently automatic assignment is inferred based on which subscribe methods are invoked (e.g. if you subscribe to a topic, then we assume you want automatic assignment). I wonder if it might help to make that instead an explicit configuration parameter? Then that might free us a little bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we don't have to guess whether the user is intending to actually subscribe to a partition or just to pause it. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36664: Patch for KAFKA-2353
On July 22, 2015, 10:40 a.m., Ismael Juma wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 467 https://reviews.apache.org/r/36664/diff/2/?file=1018375#file1018375line467 As far as I can see `ClosedChannelException`, `IllegalStateException` and `IllegalArgumentException` are enough? Also, you would it be better to use `IOException` instead of `ClosedChannelException`? What happens if other exceptions are thrown? Will we still have a socket leak? Yeah, perhaps in addition to listing the expected cases, we should also handle nonFatal(e)? (https://tersesystems.com/2012/12/27/error-handling-in-scala/) - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/#review92574 --- On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/ --- (Updated July 22, 2015, 5:02 a.m.) Review request for kafka. Bugs: KAFKA-2353 https://issues.apache.org/jira/browse/KAFKA-2353 Repository: kafka Description --- Addressed Gwen's comments Diffs - core/src/main/scala/kafka/network/SocketServer.scala 91319fa010b140cca632e5fa8050509bd2295fc9 Diff: https://reviews.apache.org/r/36664/diff/ Testing --- Thanks, Jiangjie Qin
[GitHub] kafka pull request: KAFKA-2342; KafkaConsumer rebalance with in-fl...
Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/88 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position
[ https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637539#comment-14637539 ] Guozhang Wang commented on KAFKA-2342: -- [~sslavic] corrected, thanks. KafkaConsumer rebalance with in-flight fetch can cause invalid position --- Key: KAFKA-2342 URL: https://issues.apache.org/jira/browse/KAFKA-2342 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Jason Gustafson Fix For: 0.8.3 If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end up updating the fetch position of a partition to an offset which is no longer valid. The consequence is that we may end up either returning to the user messages with an unexpected position or we may fail to give back the right offset in position(). Additionally, this bug causes transient test failures in ConsumerBounceTest.testConsumptionWithBrokerFailures with the following exception: kafka.api.ConsumerBounceTest testConsumptionWithBrokerFailures FAILED java.lang.NullPointerException at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949) at kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86) at kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36590: Patch for KAFKA-2275
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36590/#review92557 --- Hey guys, I realized that most of this code has changed as the design to address this itself changed a bit. The new diff is available at https://reviews.apache.org/r/36681/. My apologies for the inconvenience, I should have vetted out design details before posting a patch. - Ashish Singh On July 20, 2015, 5:44 p.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36590/ --- (Updated July 20, 2015, 5:44 p.m.) Review request for kafka. Bugs: KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275 Repository: kafka Description --- Add logic to get all topics when needMetadataForAllTopics is set on metadata Return metadata for all topics if empty list is passed to partitionsFor KAFKA-2275: Add a MapString, ListPartitionInfo partitionsFor(String... topics) API to the new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/Metadata.java 0387f2602c93a62cd333f1b3c569ca6b66b5b779 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 252b759c0801f392e3526b0f31503b4b8fbf1c8a clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c14eed1e95f2e682a235159a366046f00d1d90d6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 clients/src/main/java/org/apache/kafka/common/Cluster.java 60594a7dce90130911a626ea80cf80d815aeb46e core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36590/diff/ Testing --- Thanks, Ashish Singh
Review Request 36681: Patch for KAFKA-2275
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/ --- Review request for kafka. Bugs: KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275 Repository: kafka Description --- KAFKA-2275: Add a ListTopics() API to the new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 252b759c0801f392e3526b0f31503b4b8fbf1c8a clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c14eed1e95f2e682a235159a366046f00d1d90d6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36681/diff/ Testing --- Thanks, Ashish Singh
[jira] [Commented] (KAFKA-2275) Add a ListTopics() API to the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636378#comment-14636378 ] Ashish K Singh commented on KAFKA-2275: --- Created reviewboard https://reviews.apache.org/r/36681/ against branch trunk Add a ListTopics() API to the new consumer -- Key: KAFKA-2275 URL: https://issues.apache.org/jira/browse/KAFKA-2275 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Ashish K Singh Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-2275.patch, KAFKA-2275.patch, KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch With regex subscription like {code} consumer.subscribe(topic*) {code} The partition assignment is automatically done at the Kafka side, while there are some use cases where consumers want regex subscriptions but not Kafka-side partition assignment, rather with their own specific partition assignment. With ListTopics() they can periodically check for topic list changes and specifically subscribe to the partitions of the new topics. For implementation, it involves sending a TopicMetadataRequest to a random broker and parse the response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[DISCUSS] Partitioning in Kafka
Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some stages of processing on top of it to make proper use of the data, i.e., it envisions Kafka as a substrate for stream processing, and not only as the I/O component. Is this a direction that Kafka wants to go towards? Or is this a role better left to the internal communication systems of other stream processing engines (e.g., Storm)? And if the answer is the latter, how would something such a Samza (which relies mostly on Kafka as its communication substrate) be able to implement advanced partitioning schemes? Cheers, -- Gianmarco
Re: Official Kafka Gitter Room?
Hi Gwen, On Sun, Jul 19, 2015 at 2:26 AM, Gwen Shapira gshap...@cloudera.com wrote: So, as an experiment, I created: https://apachekafka.slack.com I figured we'll give it a whirl for a week or two for dev discussions, see how it goes and if we have activity we can add this to the website and announce on the lists. Are people using this? If so, please send me an invite. Ismael
[jira] [Commented] (KAFKA-2092) New partitioning for better load balancing
[ https://issues.apache.org/jira/browse/KAFKA-2092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636353#comment-14636353 ] Jason Gustafson commented on KAFKA-2092: [~azaroth], I'm actually not sure that Samza is the better place. I thought that it would have a better chance of integration since Smaza already exposes a processing layer, which seems to be needed to use this partitioner. I think perhaps what we should do is open this discussion to the Kafka dev list. I think we're missing some guidance on what partitioners should be included in Kafka. As far as I know, Kafka has only provided primitive partitioning strategies up to now, but as Kafka starts to provide more core facilities for stream processing, perhaps it should start to include more. New partitioning for better load balancing -- Key: KAFKA-2092 URL: https://issues.apache.org/jira/browse/KAFKA-2092 Project: Kafka Issue Type: Improvement Components: producer Reporter: Gianmarco De Francisci Morales Assignee: Jun Rao Attachments: KAFKA-2092-v1.patch, KAFKA-2092-v2.patch We have recently studied the problem of load balancing in distributed stream processing systems such as Samza [1]. In particular, we focused on what happens when the key distribution of the stream is skewed when using key grouping. We developed a new stream partitioning scheme (which we call Partial Key Grouping). It achieves better load balancing than hashing while being more scalable than round robin in terms of memory. In the paper we show a number of mining algorithms that are easy to implement with partial key grouping, and whose performance can benefit from it. We think that it might also be useful for a larger class of algorithms. PKG has already been integrated in Storm [2], and I would like to be able to use it in Samza as well. As far as I understand, Kafka producers are the ones that decide how to partition the stream (or Kafka topic). I do not have experience with Kafka, however partial key grouping is very easy to implement: it requires just a few lines of code in Java when implemented as a custom grouping in Storm [3]. I believe it should be very easy to integrate. For all these reasons, I believe it will be a nice addition to Kafka/Samza. If the community thinks it's a good idea, I will be happy to offer support in the porting. References: [1] https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf [2] https://issues.apache.org/jira/browse/STORM-632 [3] https://github.com/gdfm/partial-key-grouping -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Official Kafka Gitter Room?
On Apache Mahout project we're using Slack as well - for release coordination. It was found that extra Slack channel does not really fit into Apache way - it was overused, there were too many design discussions going on there, to which community at large has no access to, was not and could not be involved, cannot even see history. This is not the case with user/dev mailing list, with searchable archives. Kind regards, Stevo Slavic. On Wed, Jul 22, 2015 at 11:07 AM, Ismael Juma ism...@juma.me.uk wrote: Hi Gwen, On Sun, Jul 19, 2015 at 2:26 AM, Gwen Shapira gshap...@cloudera.com wrote: So, as an experiment, I created: https://apachekafka.slack.com I figured we'll give it a whirl for a week or two for dev discussions, see how it goes and if we have activity we can add this to the website and announce on the lists. Are people using this? If so, please send me an invite. Ismael
Re: Kafka Unit Test Failures on a Mac
On Mon, Jul 20, 2015 at 10:45 PM, Grant Henke ghe...@cloudera.com wrote: In one run of the core tests I found the following: - 4584 regular files (REG) - 376 .jar files - Not much one can/should do here. Many are from gradle itself. - 2392 kafka .log files - why are these being leaked? - after a single test no file handles should remain - 1162 kafka .log.deleted files - why are these being leaked? Some of those are probably due to: https://issues.apache.org/jira/browse/KAFKA-1782 Ismael
Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?
I'm getting Unknown api code 11 even when both client and server are 0.8.3/trunk, when KafkaConsumer.subscribe(String... topics) is used. Bug? Kind regards, Stevo Slavic. On Fri, Apr 24, 2015 at 7:13 PM, Neha Narkhede n...@confluent.io wrote: Yes, I was clearly confused :-) On Fri, Apr 24, 2015 at 9:37 AM, Sean Lydon lydon.s...@gmail.com wrote: Thanks for the responses. Ewen is correct that I am referring to the *new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer). I am extending the consumer to allow my applications more control over committed offsets. I really want to get away from zookeeper (so using the offset storage), and re-balancing is something I haven't really needed to tackle in an automated/seamless way. Either way, I'll hold off going further down this road until there is more interest. @Gwen I set up a single consumer without partition.assignment.strategy or rebalance.callback.class. I was unable to subscribe to just a topic (Unknown api code 11 on broker), but I could subscribe to a topicpartition. This makes sense as I would need to handle re-balance outside the consumer. Things functioned as expected (well I have an additional minor fix to code from KAFKA-2121), and the only exceptions on broker were due to closing consumers (which I have become accustomed to). My tests are specific to my extended version of the consumer, but they basically do a little writing and reading with different serde classes with application controlled commits (similar to onSuccess and onFailure after each record, but with tolerance for out of order acknowledgements). If you are interested, here is the patch of the hack against trunk. On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava e...@confluent.io wrote: @Neha I think you're mixing up the 0.8.1/0.8.2 updates and the 0.8.2/0.8.3 that's being discussed here? I think the original question was about using the *new* consumer (clients consumer) with 0.8.2. Gwen's right, it will use features not even implemented in the broker in trunk yet, let alone the 0.8.2. I don't think the enable.commit.downgrade type option, or supporting the old protocol with the new consumer at all, makes much sense. You'd end up with some weird hybrid of simple and high-level consumers -- you could use offset storage, but you'd have to manage rebalancing yourself since none of the coordinator support would be there. On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede n...@confluent.io wrote: My understanding is that ideally the 0.8.3 consumer should work with an 0.8.2 broker if the offset commit config was set to zookeeper. The only thing that might not work is offset commit to Kafka, which makes sense since the 0.8.2 broker does not support Kafka based offset management. If we broke all kinds of offset commits, then it seems like a regression, no? On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira gshap...@cloudera.com wrote: I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2 broker... there are some essential pieces that are missing in 0.8.2 (Coordinator, Heartbeat, etc). Maybe I'm missing something. It will be nice if this will work :) Mind sharing what / how you tested? Were there no errors in broker logs after your fix? On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon lydon.s...@gmail.com wrote: Currently the clients consumer (trunk) sends offset commit requests of version 2. The 0.8.2 brokers fail to handle this particular request with a: java.lang.AssertionError: assertion failed: Version 2 is invalid for OffsetCommitRequest. Valid versions are 0 or 1. I was able to make this work via a forceful downgrade of this particular request, but I would like some feedback on whether a enable.commit.downgrade configuration would be a tolerable method to allow 0.8.3 consumers to interact with 0.8.2 brokers. I'm also interested in this even being a goal worth pursuing. Thanks, Sean -- Thanks, Neha -- Thanks, Ewen -- Thanks, Neha
Re: Review Request 36670: Patch for KAFKA-2355
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36670/#review92571 --- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 267) https://reviews.apache.org/r/36670/#comment146770 You should either include information about the exception that was actually thrown or let the original exception propagate. Also, if you choose the former, you should do `case _: Throwable` to avoid a warning in Scala 2.11. core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 269) https://reviews.apache.org/r/36670/#comment146771 Should `verifyTopicDeletion` be inside `finally`? I don't think so because you don't want to run it if `fail` is called above. You could move it to be in the `e: TopicAlreadyMarkedForDeletionException` clause maybe? - Ismael Juma On July 22, 2015, 1:46 a.m., Edward Ribeiro wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36670/ --- (Updated July 22, 2015, 1:46 a.m.) Review request for kafka. Bugs: KAFKA-2355 https://issues.apache.org/jira/browse/KAFKA-2355 Repository: kafka Description --- KAFKA-2355 Add an unit test to validate the deletion of a partition marked as deleted Diffs - core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 Diff: https://reviews.apache.org/r/36670/diff/ Testing --- Thanks, Edward Ribeiro
Re: Review Request 36664: Patch for KAFKA-2353
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/#review92574 --- core/src/main/scala/kafka/network/SocketServer.scala (line 401) https://reviews.apache.org/r/36664/#comment146775 Is it intentional to ignore `java.lang.Error` too? core/src/main/scala/kafka/network/SocketServer.scala (line 463) https://reviews.apache.org/r/36664/#comment146773 As far as I can see `ClosedChannelException`, `IllegalStateException` and `IllegalArgumentException` are enough? Also, you would it be better to use `IOException` instead of `ClosedChannelException`? What happens if other exceptions are thrown? Will we still have a socket leak? - Ismael Juma On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/ --- (Updated July 22, 2015, 5:02 a.m.) Review request for kafka. Bugs: KAFKA-2353 https://issues.apache.org/jira/browse/KAFKA-2353 Repository: kafka Description --- Addressed Gwen's comments Diffs - core/src/main/scala/kafka/network/SocketServer.scala 91319fa010b140cca632e5fa8050509bd2295fc9 Diff: https://reviews.apache.org/r/36664/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 36670: Patch for KAFKA-2355
On July 22, 2015, 10:29 a.m., Ismael Juma wrote: core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, line 269 https://reviews.apache.org/r/36670/diff/1/?file=1018343#file1018343line269 Should `verifyTopicDeletion` be inside `finally`? I don't think so because you don't want to run it if `fail` is called above. You could move it to be in the `e: TopicAlreadyMarkedForDeletionException` clause maybe? Just removing the finally block and having it after the catch should work fine too. - Grant --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36670/#review92571 --- On July 22, 2015, 1:46 a.m., Edward Ribeiro wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36670/ --- (Updated July 22, 2015, 1:46 a.m.) Review request for kafka. Bugs: KAFKA-2355 https://issues.apache.org/jira/browse/KAFKA-2355 Repository: kafka Description --- KAFKA-2355 Add an unit test to validate the deletion of a partition marked as deleted Diffs - core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 Diff: https://reviews.apache.org/r/36670/diff/ Testing --- Thanks, Edward Ribeiro
[jira] [Created] (KAFKA-2356) Support retrieving partitions of ConsumerRecords
Stevo Slavic created KAFKA-2356: --- Summary: Support retrieving partitions of ConsumerRecords Key: KAFKA-2356 URL: https://issues.apache.org/jira/browse/KAFKA-2356 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Stevo Slavic Priority: Trivial In new consumer on trunk, ConsumerRecords has method to retrieve records for given TopicPartition, but there is no method to retrieve TopicPartition's included/available in ConsumerRecords. Please have it supported. Method could be something like: {noformat} /** * Get partitions of records returned by a {@link Consumer#poll(long)} operation */ public SetTopicPartition partitions() { return Collections.unmodifiableSet(this.records.keySet()); } {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: ConsumerRecords are organized per topic partit...
GitHub user sslavic opened a pull request: https://github.com/apache/kafka/pull/92 ConsumerRecords are organized per topic partition ConsumerRecords has records organized per topic partition, not per topic as ConsumerRecords javadoc suggested. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sslavic/kafka patch-6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/92.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #92 commit 8dacfda7c71b7acaf49a19db0563975b8a96f7a5 Author: Stevo Slavić ssla...@gmail.com Date: 2015-07-22T13:41:00Z ConsumerRecords are organized per topic partition ConsumerRecords has records organized per topic partition, not per topic as ConsumerRecords javadoc suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Jenkins build is back to normal : KafkaPreCommit #160
See https://builds.apache.org/job/KafkaPreCommit/160/changes
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637753#comment-14637753 ] Jason Gustafson commented on KAFKA-2350: [~jkreps] This is a little unrelated to this jira, but one nice consequence of moving group management into configuration is that it opens the door to using subscribe(topic) as a shortcut to subscribing to all of a topic's partitions for users who do not want to use group management (which does have a little overhead). Currently the user would have to get all the partitions for that topic and then subscribe to them individually. Not that bad, but a tad annoying, especially if you have to poll for changes in the number of partitions. Anyway, I tend to agree with [~yasuhiro.matsuda] and [~gwenshap] that trying to mix partition and topic subscriptions in order to do pausing seems problematic. The explicit pause/unpause methods might not be as elegant, but I think they are easier for users to understand and are much easier for us to implement. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637792#comment-14637792 ] Jiangjie Qin commented on KAFKA-2350: - [~guozhang] In the last case you mentioned, is topic2-partition1 one of the partitions assigned by consumer coordinator? If it is not, the subscription will fail because the partition is not in the assignedTopicpartitions set. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2004) Write Kafka messages directly to HDFS
[ https://issues.apache.org/jira/browse/KAFKA-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637419#comment-14637419 ] sutanu das commented on KAFKA-2004: --- We just want to stream each logfile as event from kafka queue via Storm to HDFS Is there a basic sprout or bolt code available in python to do this? Write Kafka messages directly to HDFS - Key: KAFKA-2004 URL: https://issues.apache.org/jira/browse/KAFKA-2004 Project: Kafka Issue Type: Bug Components: consumer, core, producer Affects Versions: 0.8.1.1 Reporter: sutanu das Assignee: Neha Narkhede Priority: Critical 1. Is there a way to write Kafka messages directly to HDFS without writing any consumer code? 2. Is there anyway to integrate Kafka with Storm or Spark so messages goes directly from Kafka consumers to HDFS sync? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1893) Allow regex subscriptions in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638028#comment-14638028 ] Jason Gustafson commented on KAFKA-1893: [~singhashish] Sure, no problem. Allow regex subscriptions in the new consumer - Key: KAFKA-1893 URL: https://issues.apache.org/jira/browse/KAFKA-1893 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Jay Kreps Assignee: Jason Gustafson Priority: Critical Fix For: 0.8.3 The consumer needs to handle subscribing to regular expressions. Presumably this would be done as a new api, {code} void subscribe(java.util.regex.Pattern pattern); {code} Some questions/thoughts to work out: - It should not be possible to mix pattern subscription with partition subscription. - Is it allowable to mix this with normal topic subscriptions? Logically this is okay but a bit complex to implement. - We need to ensure we regularly update the metadata and recheck our regexes against the metadata to update subscriptions for new topics that are created or old topics that are deleted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36670: Patch for KAFKA-2355
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36670/ --- (Updated July 23, 2015, 1:23 a.m.) Review request for kafka. Bugs: KAFKA-2355 https://issues.apache.org/jira/browse/KAFKA-2355 Repository: kafka Description --- KAFKA-2355 Add an unit test to validate the deletion of a partition marked as deleted Diffs (updated) - core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 Diff: https://reviews.apache.org/r/36670/diff/ Testing --- Thanks, Edward Ribeiro
[jira] [Commented] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted
[ https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638036#comment-14638036 ] Edward Ribeiro commented on KAFKA-2355: --- Updated reviewboard https://reviews.apache.org/r/36670/diff/ against branch origin/trunk Add an unit test to validate the deletion of a partition marked as deleted -- Key: KAFKA-2355 URL: https://issues.apache.org/jira/browse/KAFKA-2355 Project: Kafka Issue Type: Test Affects Versions: 0.8.2.1 Reporter: Edward Ribeiro Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch, KAFKA-2355_2015-07-22_22:23:13.patch Trying to delete a partition marked as deleted throws {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to validate this behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted
[ https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edward Ribeiro updated KAFKA-2355: -- Attachment: KAFKA-2355_2015-07-22_22:23:13.patch Add an unit test to validate the deletion of a partition marked as deleted -- Key: KAFKA-2355 URL: https://issues.apache.org/jira/browse/KAFKA-2355 Project: Kafka Issue Type: Test Affects Versions: 0.8.2.1 Reporter: Edward Ribeiro Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch, KAFKA-2355_2015-07-22_22:23:13.patch Trying to delete a partition marked as deleted throws {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to validate this behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36681: Patch for KAFKA-2275
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/#review92700 --- Ship it! Just one question on the unit test. Otherwise, LGTM. clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java (line 173) https://reviews.apache.org/r/36681/#comment146940 Why do we need to do this in a separate thread? - Jason Gustafson On July 22, 2015, 11:09 p.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/ --- (Updated July 22, 2015, 11:09 p.m.) Review request for kafka. Bugs: KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275 Repository: kafka Description --- Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all topics. Remove braces for single line if KAFKA-2275: Add a ListTopics() API to the new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 252b759c0801f392e3526b0f31503b4b8fbf1c8a clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c14eed1e95f2e682a235159a366046f00d1d90d6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java d595c1cb07909e21946532501f648be3033603d9 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36681/diff/ Testing --- Thanks, Ashish Singh
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638063#comment-14638063 ] Jiangjie Qin commented on KAFKA-2350: - Yes, that is what I meant. Isn't it the case that subscribe(partition) always check the if subscribe(topic) has been called or not? If user wants to subscribe/unsubscribe to an illegal partition, I kind of think it is easy to understand that we just throw exception and say Consumer is using Kafka based offset assignment and topic partition tp is not an assigned partition. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638070#comment-14638070 ] Jiangjie Qin commented on KAFKA-2350: - I think an explicit configuration makes sense. Different subscribe/unsubscribe methods confuses people from time to time. So if enable.consumer.coordinator==false, user are on their own, all the subscribe/unsubscribe methods will work. But what not clear to me is that if enable.consumer.coordinator==true, will user be able to call subscribe/unsbuscribe(partitions)? Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Round Robin Among Consumers
Ordering gurantee should be optional. If someone needs ordering, they can always fall back on exclusive consumer strategy On Wednesday, July 22, 2015, Ashish Singh asi...@cloudera.com wrote: Hey, don't you think that would be against the basic ordering guarantees Kafka provides? On Wed, Jul 22, 2015 at 2:14 PM, J A mbatth...@gmail.com javascript:; wrote: Hi, This is reference to stackoverflow question http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers Since Kafka 0.8 already maintains a client offset, i would like to request a feature, where a single partition consumption can be round robin across a set of consumers. The message delivery strategy should be an option chosen by the consumer. -- Regards, Ashish
Re: Review Request 36681: Patch for KAFKA-2275
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/#review92696 --- clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java (line 167) https://reviews.apache.org/r/36681/#comment146934 The code looks pretty good. Congrats! clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java (line 201) https://reviews.apache.org/r/36681/#comment146932 Just wondering to myself. If ``retryBackoffMs`` is = than ``timeout``, then the caller will have one shot only where it can try to retrieve the topics. Not that this is a problem, as ``retryBackoffMs`` is a small amount of milliseconds, as far as I saw, but nevertheless something to be aware of. core/src/test/scala/integration/kafka/api/ConsumerTest.scala (line 198) https://reviews.apache.org/r/36681/#comment146930 nit: really *really* really cosmetic stuff, but ``topicPartsMap`` makes more sense, no? :) - Edward Ribeiro On July 22, 2015, 11:09 p.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/ --- (Updated July 22, 2015, 11:09 p.m.) Review request for kafka. Bugs: KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275 Repository: kafka Description --- Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all topics. Remove braces for single line if KAFKA-2275: Add a ListTopics() API to the new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 252b759c0801f392e3526b0f31503b4b8fbf1c8a clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c14eed1e95f2e682a235159a366046f00d1d90d6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java d595c1cb07909e21946532501f648be3033603d9 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36681/diff/ Testing --- Thanks, Ashish Singh
Re: Review Request 36664: Patch for KAFKA-2353
On July 22, 2015, 10:40 a.m., Ismael Juma wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 401 https://reviews.apache.org/r/36664/diff/2/?file=1018375#file1018375line401 Is it intentional to ignore `java.lang.Error` too? I think java.lang.Error is a subclass of throwables. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/#review92574 --- On July 23, 2015, 12:51 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/ --- (Updated July 23, 2015, 12:51 a.m.) Review request for kafka. Bugs: KAFKA-2353 https://issues.apache.org/jira/browse/KAFKA-2353 Repository: kafka Description --- Addressed Gwen's comments Addressed Gwen's comments. Diffs - core/src/main/scala/kafka/network/SocketServer.scala 91319fa010b140cca632e5fa8050509bd2295fc9 Diff: https://reviews.apache.org/r/36664/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-2351) Brokers are having a problem shutting down correctly
[ https://issues.apache.org/jira/browse/KAFKA-2351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638076#comment-14638076 ] Mayuresh Gharat commented on KAFKA-2351: [~junrao], [~guozhang] can you take a look at this? Brokers are having a problem shutting down correctly Key: KAFKA-2351 URL: https://issues.apache.org/jira/browse/KAFKA-2351 Project: Kafka Issue Type: Bug Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat Attachments: KAFKA-2351.patch, KAFKA-2351_2015-07-21_14:58:13.patch The run() in Acceptor during shutdown might throw an exception that is not caught and it never reaches shutdownComplete due to which the latch is not counted down and the broker will not be able to shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Round Robin Among Consumers
Why have partition at all, if I don't need to scale topic. Coupling topic scalability with consumer scalability just goes against messaging systems core principle of decoupling consumer and producers On Wednesday, July 22, 2015, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi, Why not simply have as many partitions as the set of consumers you want to round robin across? Aditya On Wed, Jul 22, 2015 at 2:37 PM, Ashish Singh asi...@cloudera.com javascript:; wrote: Hey, don't you think that would be against the basic ordering guarantees Kafka provides? On Wed, Jul 22, 2015 at 2:14 PM, J A mbatth...@gmail.com javascript:; wrote: Hi, This is reference to stackoverflow question http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers Since Kafka 0.8 already maintains a client offset, i would like to request a feature, where a single partition consumption can be round robin across a set of consumers. The message delivery strategy should be an option chosen by the consumer. -- Regards, Ashish
Re: Review Request 36664: Patch for KAFKA-2353
On July 22, 2015, 5:13 p.m., Guozhang Wang wrote: LGTM overall. Could you address Ismael's comments as well before check-in? Thanks, Guozhang. I updated patch to address Ismael's comments. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/#review92607 --- On July 23, 2015, 12:51 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/ --- (Updated July 23, 2015, 12:51 a.m.) Review request for kafka. Bugs: KAFKA-2353 https://issues.apache.org/jira/browse/KAFKA-2353 Repository: kafka Description --- Addressed Gwen's comments Addressed Gwen's comments. Diffs - core/src/main/scala/kafka/network/SocketServer.scala 91319fa010b140cca632e5fa8050509bd2295fc9 Diff: https://reviews.apache.org/r/36664/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638139#comment-14638139 ] Jay Kreps commented on KAFKA-2350: -- [~hachikuji] yeah i vote for pause/resume (or unpause). I think the challenge with making group management explicit is that then it will conflict with the api usage. I.e. the user will say client.subscribe(mytopic) and something terrible and bad will happen and they won't know why and we will say uh uh uh you forgot to set the magic enable.consumer.coordinator=true flag. This was kind of what I liked about the implicit approach--the natural usage of just subscribing to a partition or topic does what you would expect. I would anticipate making this a flag turning into a source of confusion because in the early stages of usage most people won't know what a consumer coordinator is. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33620: Patch for KAFKA-1690
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review92711 --- clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java (line 47) https://reviews.apache.org/r/33620/#comment146946 It seems that we will need to add similar logic in other Sends: MultiSend, FetchResponseSend, TopicDataSend and PartitionDataSend. - Jun Rao On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 20, 2015, 7 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. Diffs - build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java PRE-CREATION
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638182#comment-14638182 ] Jiangjie Qin commented on KAFKA-2350: - [~yasuhiro.matsuda] [~jkreps] [~hachikuji] [~gwenshap] If we are using pause/unpause, does that mean when user are not using consumer coordinator, they are equivalent to subscribe(partitions)/unsubscribe(partitions)? I still don't understand why this is an overloading of current subscribe/unsubscribe API. The way I see how the KafkaConsumer API works is we have different methods to set different fields of a fetch request(offsets, topic, partitions), then we do a poll() using those settings. To me the definition of all the subscribe/unsubscribe methods stay unchanged: * Subscrube/Unsubscribe to a TOPIC means ** Involve consumer coordinator to do partition assignment ** Consumer rebalance will be triggered * Subscribe/Unsubscribe to a PARTITION means ** Do not involve consumer coordinator ** Consumer rebalance will not be triggered. The only change is that instead of naively reject a PARTITION sub/unsub when consumer coordinator is involved, we allow users to decide whether they want to change the setting for your next poll() to exclude some topic partitions that have been assigned to this consumer. Therefore I don't see why using subscribe(partitions)/unsubscribe(partitions) for pause and unpause consuming is a behavior re-definition. It looks to me that pause/unpause does the exact same thing as partition level subscribe/unsubscribe but we are adding them simply because we think user are using them for different use case. if so, does it mean we need to add yet another pair of interface if people are subscribe/unsubscribe partitions for some other use case? Then we are going to end up with a bunch of interfaces doing very similar or even exact same thing but with different names based on the use case. If the reason we don't like to use sub/unsub is because their names sound like purpose oriented and indicate a particular use case, we can change the name to something like addTopicPartition()/addTopic() (I know I am a terrible name picker, but hopefully you get what I wanted to say). Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Round Robin Among Consumers
Hi, This is reference to stackoverflow question http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers Since Kafka 0.8 already maintains a client offset, i would like to request a feature, where a single partition consumption can be round robin across a set of consumers. The message delivery strategy should be an option chosen by the consumer.
Re: Review Request 36681: Patch for KAFKA-2275
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/#review92644 --- Ship it! LGTM. I think we can checkin after Jason's comments get addressed. - Guozhang Wang On July 22, 2015, 6:32 a.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/ --- (Updated July 22, 2015, 6:32 a.m.) Review request for kafka. Bugs: KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275 Repository: kafka Description --- KAFKA-2275: Add a ListTopics() API to the new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 252b759c0801f392e3526b0f31503b4b8fbf1c8a clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c14eed1e95f2e682a235159a366046f00d1d90d6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36681/diff/ Testing --- Thanks, Ashish Singh
Re: Review Request 36664: Patch for KAFKA-2353
On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 465 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465 Turns out that catching Throwable is a really bad idea: https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ Jiangjie Qin wrote: Ah... Didn't know that before. I explicitly listed the exceptions. Guozhang Wang wrote: Searching : Throwable gives me 180+ cases in code base :P Though many of them are from unit tests (which, arguably maybe OK) there are still a lot in the core package. I agree that we should avoid catching Throwable whenever possible, which will also help enforcing the developers to think about possible checked exceptions in the calling trace. Gwen Shapira wrote: I know :( I'm not sure if going over and converting everything is worth the effort. Although it can be a nice newbie jira. Jiangjie Qin wrote: Maybe we can simply change that to catch Throwables except ControlThrowables? That might be a simple search and replace. possible. definitely not in this JIRA though :) On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 400 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400 So in case of unexpected exception, we log an error and keep running? Isn't it better to kill the processor, since we don't know what's the state of the system? If the acceptor keeps placing messages in the queue for a dead processor, isn't it a separate issue? Jiangjie Qin wrote: This part I'm not quite sure. I am not very experienced in the error handling in such case, so please correct me if I missed something. Here is what I thought. The way it currently works is that the acceptor will 1. accept new connection request and create new socket channel 2. choose a processor and put the socket channel into the processor's new connection queue The processor will just take the socket channels from the queue and register it to the selector. If the processor runs and get an uncaught exception, there are several possibilities. Case 1: The exception was from one socket channel. Case 2: The exception was associated with a bad request. In case 1, ideally we should just disconnect that socket channel without affecting other socket channels. In case 2, I think we should log the error and skip the message - assuming client will retry sending data if no response was received for a given peoriod of time. I am not sure if letting processor exit is a good idea because this will lead to the result of a badly behaving client screw the entire cluster - it might screw processors one by one. Comparing with that, I kind of leaning towards keeping the processor running and serving other normal TCP connections if possible, but log the error so monitoring system can detect and see if human intervention is needed. Also, I don't know what to do here to prevent the thread from exiting without catching all the throwables. According to this blog http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/ I guess I can rethrow all the ControlThrowables, but intercept the rests? Guozhang Wang wrote: I would also prefer not to close the Processor thread upon exceptions, mainly for avoid one bad client killing a shared Kafka cluster. In the past we have seen such issues like DDoS MetadataRequest killing the cluster and all other clients gets affected, etc, and the quota work is towards preventing it. Since Processor threads are shared (8 by default on a broker), it should not be closed by a single socket / bad client request. Gwen Shapira wrote: I like your thinking around cases #1 and #2. I think this should go as a code comment somewhere, so when people improve / extend SocketServer they will keep this logic in mind. Maybe even specify in specific catch clauses if they are handling possible errors in request level or channel level. My concern is with possible case #3: Each processor has an o.a.k.common.network.Selector. I'm concerned about the possibility of something going wrong in the state of the selector, which will possibly be an issue for all channels. For example failure to register could be an issue with the channel.register call, but also perhaps an issue with keys.put (just an example - I'm not sure something can actually break keys table). I'd like to be able to identify cases where the Selector state may have gone wrong and close the processor in that case. Does that make any sense? Or am I being too paranoid? Jiangjie Qin wrote: Hi Gwen, I think what you said makes sense. Maybe I see this more from a failure boundary point of
Re: Round Robin Among Consumers
Hi, Why not simply have as many partitions as the set of consumers you want to round robin across? Aditya On Wed, Jul 22, 2015 at 2:37 PM, Ashish Singh asi...@cloudera.com wrote: Hey, don't you think that would be against the basic ordering guarantees Kafka provides? On Wed, Jul 22, 2015 at 2:14 PM, J A mbatth...@gmail.com wrote: Hi, This is reference to stackoverflow question http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers Since Kafka 0.8 already maintains a client offset, i would like to request a feature, where a single partition consumption can be round robin across a set of consumers. The message delivery strategy should be an option chosen by the consumer. -- Regards, Ashish
Re: Round Robin Among Consumers
Hey, don't you think that would be against the basic ordering guarantees Kafka provides? On Wed, Jul 22, 2015 at 2:14 PM, J A mbatth...@gmail.com wrote: Hi, This is reference to stackoverflow question http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers Since Kafka 0.8 already maintains a client offset, i would like to request a feature, where a single partition consumption can be round robin across a set of consumers. The message delivery strategy should be an option chosen by the consumer. -- Regards, Ashish
[jira] [Commented] (KAFKA-2004) Write Kafka messages directly to HDFS
[ https://issues.apache.org/jira/browse/KAFKA-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637745#comment-14637745 ] Ashish K Singh commented on KAFKA-2004: --- Spark streaming supports reading from Kafka. Once you have read from Kafka, it is upto you what you want to do with it. Here is an example, https://github.com/SinghAsDev/pankh. Write Kafka messages directly to HDFS - Key: KAFKA-2004 URL: https://issues.apache.org/jira/browse/KAFKA-2004 Project: Kafka Issue Type: Bug Components: consumer, core, producer Affects Versions: 0.8.1.1 Reporter: sutanu das Assignee: Neha Narkhede Priority: Critical 1. Is there a way to write Kafka messages directly to HDFS without writing any consumer code? 2. Is there anyway to integrate Kafka with Storm or Spark so messages goes directly from Kafka consumers to HDFS sync? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2275) Add a ListTopics() API to the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish K Singh updated KAFKA-2275: -- Attachment: KAFKA-2275_2015-07-22_16:09:34.patch Add a ListTopics() API to the new consumer -- Key: KAFKA-2275 URL: https://issues.apache.org/jira/browse/KAFKA-2275 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Ashish K Singh Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-2275.patch, KAFKA-2275.patch, KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch, KAFKA-2275_2015-07-22_16:09:34.patch With regex subscription like {code} consumer.subscribe(topic*) {code} The partition assignment is automatically done at the Kafka side, while there are some use cases where consumers want regex subscriptions but not Kafka-side partition assignment, rather with their own specific partition assignment. With ListTopics() they can periodically check for topic list changes and specifically subscribe to the partitions of the new topics. For implementation, it involves sending a TopicMetadataRequest to a random broker and parse the response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637890#comment-14637890 ] Guozhang Wang commented on KAFKA-2350: -- [~becket_qin] OK I understand now. The subscribe(partition) will check if subscribe(topic) has been called before, hence assignment() is not null. I think behavior is a bit confusing since it depends on the call history of subscribe.. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2275) Add a ListTopics() API to the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637889#comment-14637889 ] Ashish K Singh commented on KAFKA-2275: --- Updated reviewboard https://reviews.apache.org/r/36681/ against branch trunk Add a ListTopics() API to the new consumer -- Key: KAFKA-2275 URL: https://issues.apache.org/jira/browse/KAFKA-2275 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Ashish K Singh Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-2275.patch, KAFKA-2275.patch, KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch, KAFKA-2275_2015-07-22_16:09:34.patch With regex subscription like {code} consumer.subscribe(topic*) {code} The partition assignment is automatically done at the Kafka side, while there are some use cases where consumers want regex subscriptions but not Kafka-side partition assignment, rather with their own specific partition assignment. With ListTopics() they can periodically check for topic list changes and specifically subscribe to the partitions of the new topics. For implementation, it involves sending a TopicMetadataRequest to a random broker and parse the response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36681: Patch for KAFKA-2275
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/ --- (Updated July 22, 2015, 11:09 p.m.) Review request for kafka. Bugs: KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275 Repository: kafka Description (updated) --- Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all topics. Remove braces for single line if KAFKA-2275: Add a ListTopics() API to the new consumer Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 252b759c0801f392e3526b0f31503b4b8fbf1c8a clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c14eed1e95f2e682a235159a366046f00d1d90d6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java d595c1cb07909e21946532501f648be3033603d9 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36681/diff/ Testing --- Thanks, Ashish Singh
[jira] [Updated] (KAFKA-2353) SocketServer.Processor should catch exception and close the socket properly in configureNewConnections.
[ https://issues.apache.org/jira/browse/KAFKA-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-2353: Attachment: KAFKA-2353_2015-07-22_17:51:42.patch SocketServer.Processor should catch exception and close the socket properly in configureNewConnections. --- Key: KAFKA-2353 URL: https://issues.apache.org/jira/browse/KAFKA-2353 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-2353.patch, KAFKA-2353_2015-07-21_22:02:24.patch, KAFKA-2353_2015-07-22_17:51:42.patch We see an increasing number of sockets in CLOSE_WAIT status in our production environment in recent couple of days. From the thread dump it seems one of the Processor thread has died but the acceptor was still putting many new connections its new connection queue. The cause of dead Processor thread was due to we are not catching all the exceptions in the Processor thread. For example, in our case it seems to be an exception thrown in configureNewConnections(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36681: Patch for KAFKA-2275
On July 22, 2015, 4:43 p.m., Jason Gustafson wrote: Hey Ashish, this looks pretty good to me. Just some minor comments. Thanks for the review! Addressed your concerns. On July 22, 2015, 4:43 p.m., Jason Gustafson wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, line 298 https://reviews.apache.org/r/36681/diff/1/?file=1018418#file1018418line298 Any reason not to put this method in Fetcher instead of here? I don't have a strong feeling, but it was kind of nice keeping ConsumerNetworkClient largely free of application logic. Also, it might be nice to have a unit test. Moved and added unit test for Fetcher.getAllTopics. On July 22, 2015, 4:43 p.m., Jason Gustafson wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, lines 314-315 https://reviews.apache.org/r/36681/diff/1/?file=1018418#file1018418line314 I think I asked this before, but is there any harm in returning this topic to the user? I ask because we don't actually prevent them from calling partitionsFor with the same topic. Removed the exclusion. - Ashish --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/#review92604 --- On July 22, 2015, 11:09 p.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36681/ --- (Updated July 22, 2015, 11:09 p.m.) Review request for kafka. Bugs: KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275 Repository: kafka Description --- Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all topics. Remove braces for single line if KAFKA-2275: Add a ListTopics() API to the new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 252b759c0801f392e3526b0f31503b4b8fbf1c8a clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c14eed1e95f2e682a235159a366046f00d1d90d6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java d595c1cb07909e21946532501f648be3033603d9 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36681/diff/ Testing --- Thanks, Ashish Singh
Re: Review Request 36670: Patch for KAFKA-2355
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36670/ --- (Updated July 23, 2015, 12:38 a.m.) Review request for kafka. Bugs: KAFKA-2355 https://issues.apache.org/jira/browse/KAFKA-2355 Repository: kafka Description --- KAFKA-2355 Add an unit test to validate the deletion of a partition marked as deleted Diffs (updated) - core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 Diff: https://reviews.apache.org/r/36670/diff/ Testing --- Thanks, Edward Ribeiro
[jira] [Updated] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted
[ https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edward Ribeiro updated KAFKA-2355: -- Attachment: KAFKA-2355_2015-07-22_21:37:51.patch Add an unit test to validate the deletion of a partition marked as deleted -- Key: KAFKA-2355 URL: https://issues.apache.org/jira/browse/KAFKA-2355 Project: Kafka Issue Type: Test Affects Versions: 0.8.2.1 Reporter: Edward Ribeiro Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch Trying to delete a partition marked as deleted throws {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to validate this behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted
[ https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637984#comment-14637984 ] Edward Ribeiro commented on KAFKA-2355: --- Updated reviewboard https://reviews.apache.org/r/36670/diff/ against branch origin/trunk Add an unit test to validate the deletion of a partition marked as deleted -- Key: KAFKA-2355 URL: https://issues.apache.org/jira/browse/KAFKA-2355 Project: Kafka Issue Type: Test Affects Versions: 0.8.2.1 Reporter: Edward Ribeiro Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch Trying to delete a partition marked as deleted throws {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to validate this behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1893) Allow regex subscriptions in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637894#comment-14637894 ] Ashish K Singh commented on KAFKA-1893: --- [~hachikuji] I think after working on KAFKA-2275, I got even more interested in working on this. In case you are not working on this yet and you think it is OK for me to take a crack on this, I would like to work on this. Allow regex subscriptions in the new consumer - Key: KAFKA-1893 URL: https://issues.apache.org/jira/browse/KAFKA-1893 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Jay Kreps Assignee: Jason Gustafson Priority: Critical Fix For: 0.8.3 The consumer needs to handle subscribing to regular expressions. Presumably this would be done as a new api, {code} void subscribe(java.util.regex.Pattern pattern); {code} Some questions/thoughts to work out: - It should not be possible to mix pattern subscription with partition subscription. - Is it allowable to mix this with normal topic subscriptions? Logically this is okay but a bit complex to implement. - We need to ensure we regularly update the metadata and recheck our regexes against the metadata to update subscriptions for new topics that are created or old topics that are deleted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?
Jiangjie, Seems I was misunderstood; KafakConsumer.poll after subscribing to topic via KafkaConsumer.subscribe(String... topics) does not work, Unknown api code 11 error, even with both client and broker being latest 0.8.3/trunk. Will try to create a failing test and open bug report. Kind regards, Stevo Slavic. On Wed, Jul 22, 2015 at 8:36 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I don't think we have consumer coordinator in 0.8.2 brokers. So KafkaConsumer in 0.8.3 will only be able to subscribe to partitions explicitly. Subscribing to a topic won't work with 0.8.2 brokers. Jiangjie (Becket) Qin On Wed, Jul 22, 2015 at 4:26 AM, Stevo Slavić ssla...@gmail.com wrote: I'm getting Unknown api code 11 even when both client and server are 0.8.3/trunk, when KafkaConsumer.subscribe(String... topics) is used. Bug? Kind regards, Stevo Slavic. On Fri, Apr 24, 2015 at 7:13 PM, Neha Narkhede n...@confluent.io wrote: Yes, I was clearly confused :-) On Fri, Apr 24, 2015 at 9:37 AM, Sean Lydon lydon.s...@gmail.com wrote: Thanks for the responses. Ewen is correct that I am referring to the *new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer). I am extending the consumer to allow my applications more control over committed offsets. I really want to get away from zookeeper (so using the offset storage), and re-balancing is something I haven't really needed to tackle in an automated/seamless way. Either way, I'll hold off going further down this road until there is more interest. @Gwen I set up a single consumer without partition.assignment.strategy or rebalance.callback.class. I was unable to subscribe to just a topic (Unknown api code 11 on broker), but I could subscribe to a topicpartition. This makes sense as I would need to handle re-balance outside the consumer. Things functioned as expected (well I have an additional minor fix to code from KAFKA-2121), and the only exceptions on broker were due to closing consumers (which I have become accustomed to). My tests are specific to my extended version of the consumer, but they basically do a little writing and reading with different serde classes with application controlled commits (similar to onSuccess and onFailure after each record, but with tolerance for out of order acknowledgements). If you are interested, here is the patch of the hack against trunk. On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava e...@confluent.io wrote: @Neha I think you're mixing up the 0.8.1/0.8.2 updates and the 0.8.2/0.8.3 that's being discussed here? I think the original question was about using the *new* consumer (clients consumer) with 0.8.2. Gwen's right, it will use features not even implemented in the broker in trunk yet, let alone the 0.8.2. I don't think the enable.commit.downgrade type option, or supporting the old protocol with the new consumer at all, makes much sense. You'd end up with some weird hybrid of simple and high-level consumers -- you could use offset storage, but you'd have to manage rebalancing yourself since none of the coordinator support would be there. On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede n...@confluent.io wrote: My understanding is that ideally the 0.8.3 consumer should work with an 0.8.2 broker if the offset commit config was set to zookeeper. The only thing that might not work is offset commit to Kafka, which makes sense since the 0.8.2 broker does not support Kafka based offset management. If we broke all kinds of offset commits, then it seems like a regression, no? On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira gshap...@cloudera.com wrote: I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2 broker... there are some essential pieces that are missing in 0.8.2 (Coordinator, Heartbeat, etc). Maybe I'm missing something. It will be nice if this will work :) Mind sharing what / how you tested? Were there no errors in broker logs after your fix? On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon lydon.s...@gmail.com wrote: Currently the clients consumer (trunk) sends offset commit requests of version 2. The 0.8.2 brokers fail to handle this particular request with a: java.lang.AssertionError: assertion failed: Version 2 is invalid for OffsetCommitRequest. Valid versions are 0 or 1. I was able to make this work via a forceful downgrade of this particular request, but I would like some feedback on whether a enable.commit.downgrade configuration would be a tolerable method to allow 0.8.3 consumers to interact
Re: Review Request 36670: Patch for KAFKA-2355
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36670/#review92688 --- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 267) https://reviews.apache.org/r/36670/#comment146915 I am ambivalent about catching '_', so I decided to let the error propagate. core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 269) https://reviews.apache.org/r/36670/#comment146916 I disagree because we are issuing two `deleteTopic` operations, the first one should succeed and the second one should fail with an specific exception, but in spite of this the topic should have been deleted by the first call, even if the fail is called. But I am okay with removing the finally block and have it after the catch. - Edward Ribeiro On July 22, 2015, 1:46 a.m., Edward Ribeiro wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36670/ --- (Updated July 22, 2015, 1:46 a.m.) Review request for kafka. Bugs: KAFKA-2355 https://issues.apache.org/jira/browse/KAFKA-2355 Repository: kafka Description --- KAFKA-2355 Add an unit test to validate the deletion of a partition marked as deleted Diffs - core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 Diff: https://reviews.apache.org/r/36670/diff/ Testing --- Thanks, Edward Ribeiro
[jira] [Commented] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted
[ https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637996#comment-14637996 ] Edward Ribeiro commented on KAFKA-2355: --- Hi [~ijuma] and [~granthenke], I have updated the patch as requested in the latest review. Let me know if something is missing or I misinterpreted, please. :) ps: could one of you assign this little ticket to me? I asked for inclusion into colaborators' list so that I could assign issues to myself yesterday I guess, but it was not granted yet. :-P Cheers! Add an unit test to validate the deletion of a partition marked as deleted -- Key: KAFKA-2355 URL: https://issues.apache.org/jira/browse/KAFKA-2355 Project: Kafka Issue Type: Test Affects Versions: 0.8.2.1 Reporter: Edward Ribeiro Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch Trying to delete a partition marked as deleted throws {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to validate this behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36664: Patch for KAFKA-2353
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36664/ --- (Updated July 23, 2015, 12:51 a.m.) Review request for kafka. Bugs: KAFKA-2353 https://issues.apache.org/jira/browse/KAFKA-2353 Repository: kafka Description (updated) --- Addressed Gwen's comments Addressed Gwen's comments. Diffs (updated) - core/src/main/scala/kafka/network/SocketServer.scala 91319fa010b140cca632e5fa8050509bd2295fc9 Diff: https://reviews.apache.org/r/36664/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-2353) SocketServer.Processor should catch exception and close the socket properly in configureNewConnections.
[ https://issues.apache.org/jira/browse/KAFKA-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638008#comment-14638008 ] Jiangjie Qin commented on KAFKA-2353: - Updated reviewboard https://reviews.apache.org/r/36664/diff/ against branch origin/trunk SocketServer.Processor should catch exception and close the socket properly in configureNewConnections. --- Key: KAFKA-2353 URL: https://issues.apache.org/jira/browse/KAFKA-2353 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-2353.patch, KAFKA-2353_2015-07-21_22:02:24.patch, KAFKA-2353_2015-07-22_17:51:42.patch We see an increasing number of sockets in CLOSE_WAIT status in our production environment in recent couple of days. From the thread dump it seems one of the Processor thread has died but the acceptor was still putting many new connections its new connection queue. The cause of dead Processor thread was due to we are not catching all the exceptions in the Processor thread. For example, in our case it seems to be an exception thrown in configureNewConnections(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33620: Patch for KAFKA-1690
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review92314 --- Thanks for the patch. Looks good overall. A few comments below. There are a few places where we still wrap single line statement with {}. build.gradle (lines 247 - 255) https://reviews.apache.org/r/33620/#comment146726 It seems that you reverted the changes in kafka-2323. checkstyle/import-control.xml (line 56) https://reviews.apache.org/r/33620/#comment146733 Is this needed? It doesn't seem that network needs to use any protocol. clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java (line 47) https://reviews.apache.org/r/33620/#comment146402 Could we add @Override annotation? clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java (line 96) https://reviews.apache.org/r/33620/#comment146404 Could we add @Override annotation? There are a few other methods in this class are like that. clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java (line 150) https://reviews.apache.org/r/33620/#comment146408 src = srcs clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java (line 183) https://reviews.apache.org/r/33620/#comment146409 Typo Rerturns clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java (line 191) https://reviews.apache.org/r/33620/#comment146410 param is not SelectionKey. clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java (line 200) https://reviews.apache.org/r/33620/#comment146411 param is not SelectionKey. clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (line 87) https://reviews.apache.org/r/33620/#comment146414 Could we add the @Override annotation to this and a few other methods in this class? clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 231 - 234) https://reviews.apache.org/r/33620/#comment146446 Still some questions on this. 1. When handshakeStatus is not NEED_UNWRAP (e.g. FINISHED), we could have flushed all the bytes. In this case, we should turn off the write interestOps in the socket key, right? 2. When handshakeStatus is NEED_UNWRAP and write is true, we will move on to the NEED_UNWRAP case. However, in this case, there may still be unflushed bytes in netWriteBuffer. 3. When handshakeStatus transitions to FINISHED, we return to the callers. Doesn't that delay the completion of the handshake since this key may no longer be selected? clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 245 - 246) https://reviews.apache.org/r/33620/#comment146594 Since we are not making use of appReadBuffer during handshake, could OVERFLOW ever happen? If not, perhaps we can add a comment. clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 361 - 374) https://reviews.apache.org/r/33620/#comment146593 Hmm, if status is ok and handshakeStatus is NEED_UNWRAP, could we get into infinite loop here? clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 394 - 397) https://reviews.apache.org/r/33620/#comment146694 We talked about that before. It's possible that we read all bytes for multiple recieves into the appReadBuffer. Selector only reads off one receive at a time. If there are no more bytes from incoming network, this key may not be selected to read off the next receive in the appReadBuffer. How do we resolve that issue? clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (line 411) https://reviews.apache.org/r/33620/#comment146696 Is renegotiation actually supported? It seems that renegotiation can happen in the middle of a regular send/receive. clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 601 - 606) https://reviews.apache.org/r/33620/#comment146697 It seems that in general, we expect addInterestOps to be called only if handshake has completed. Perhaps we can just throw an IllegalStateException if handshake hasn't completed? Ditto to removeInterestOps. clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 218 - 220) https://reviews.apache.org/r/33620/#comment146390 Coding convention: no need to wrap single line statement with {}. clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 274 - 276) https://reviews.apache.org/r/33620/#comment146391 Coding convention: no need to wrap single line statement with {}. clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 434 - 435) https://reviews.apache.org/r/33620/#comment146388 We will probably need to
[jira] [Updated] (KAFKA-2345) Attempt to delete a topic already marked for deletion throws ZkNodeExistsException
[ https://issues.apache.org/jira/browse/KAFKA-2345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stevo Slavic updated KAFKA-2345: Affects Version/s: 0.8.2.0 Attempt to delete a topic already marked for deletion throws ZkNodeExistsException -- Key: KAFKA-2345 URL: https://issues.apache.org/jira/browse/KAFKA-2345 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Ashish K Singh Assignee: Ashish K Singh Fix For: 0.8.3 Attachments: KAFKA-2345.patch, KAFKA-2345_2015-07-17_10:20:55.patch Throwing a TopicAlreadyMarkedForDeletionException will make much more sense. A user does not necessarily have to know about involvement of zk in the process. -- This message was sent by Atlassian JIRA (v6.3.4#6332)