Re: [DISCUSS] KIP-28 - Add a transform client for data processing
+1 on comparison with existing solutions. On a high level, it seems nice to have a transform library inside Kafka.. a lot of the building blocks are already there to build a stream processing framework. However the details are tricky to get right I think this discussion will get a lot more interesting when we have something concrete to look at. I'm +1 for the general idea. How far away are we from having something a prototype patch to play with? Couple of observations: - Since the input source for each processor is always Kafka, you get basic client side partition management out of the box it use the high level consumer. - The KIP states that cmd line tools will be provided to deploy as a separate service. Is the proposed scope limited to providing a library with which makes it possible build stream-processing-as- a-service or provide such a service within Kafka itself? Aditya On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, Since we will be discussing KIP-28 in the call tomorrow, can you update the KIP with the feature-comparison with existing solutions? I admit that I do not see a need for single-event-producer-consumer pair (AKA Flume Interceptor). I've seen tons of people implement such apps in the last year, and it seemed easy. Now, perhaps we were doing it all wrong... but I'd like to know how :) If we are talking about a bigger story (i.e. DSL, real stream-processing, etc), thats a different discussion. I've seen a bunch of misconceptions about SparkStreaming in this discussion, and I have some thoughts in that regard, but I'd rather not go into that if thats outside the scope of this KIP. Gwen On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com wrote: Hi Ewen, Replies inlined. On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Just some notes on the KIP doc itself: * It'd be useful to clarify at what point the plain consumer + custom code + producer breaks down. I think trivial filtering and aggregation on a single stream usually work fine with this model. Anything where you need more complex joins, windowing, etc. are where it breaks down. I think most interesting applications require that functionality, but it's helpful to make this really clear in the motivation -- right now, Kafka only provides the lowest level plumbing for stream processing applications, so most interesting apps require very heavyweight frameworks. I think for users to efficiently express complex logic like joins windowing, etc, a higher-level programming interface beyond the process() interface would definitely be better, but that does not necessarily require a heavyweight frameworks, which usually includes more than just the high-level functional programming model. I would argue that an alternative solution would better be provided for users who want some high-level programming interface but not a heavyweight stream processing framework that include the processor library plus another DSL layer on top of it. * I think the feature comparison of plain producer/consumer, stream processing frameworks, and this new library is a good start, but we might want something more thorough and structured, like a feature matrix. Right now it's hard to figure out exactly how they relate to each other. Cool, I can do that. * I'd personally push the library vs. framework story very strongly -- the total buy-in and weak integration story of stream processing frameworks is a big downside and makes a library a really compelling (and currently unavailable, as far as I am aware) alternative. Are you suggesting there are still some content missing about the motivations of adding the proposed library in the wiki page? * Comment about in-memory storage of other frameworks is interesting -- it is specific to the framework, but is supposed to also give performance benefits. The high-level functional processing interface would allow for combining multiple operations when there's no shuffle, but when there is a shuffle, we'll always be writing to Kafka, right? Spark (and presumably spark streaming) is supposed to get a big win by handling shuffles such that the data just stays in cache and never actually hits disk, or at least hits disk in the background. Will we take a hit because we always write to Kafka? I agree with Neha's comments here. One more point I want to make is materializing to Kafka is not necessarily much worse than keeping data in memory if the downstream consumption is caught up such that most of the reads will be hitting file cache. I remember Samza has illustrated that under such scenarios its throughput is actually quite comparable to Spark Streaming / Storm. * I really struggled with the structure of the KIP template with Copycat because the flow doesn't work well for proposals like
Number of kafka topics/partitions supported per cluster of n nodes
Hi, I'm looking forward to a benchmark which can explain how many total number of topics and partitions can be created in a cluster of n nodes, given the message size varies between x and y bytes and how does it vary with varying heap sizes and how it affects the system performance. e.g. the result should look like: t topics with p partitions each can be supported in a cluster of n nodes with a heap size of h MB, before the cluster sees things like JVM crashes or high mem usage or system slowdown etc. I think such benchmarks must exist so that we can make better decisions on ops side If these details dont exist, I'll be doing this test myself on varying the values of parameters described above. I would be happy to share the numbers with the community Thanks, prabcs
Re: [DISCUSS] KIP-28 - Add a transform client for data processing
Gwen, We have a compilation of notes from comparison with other systems. They might be missing details that folks who worked on that system might be able to point out. We can share that and discuss further on the KIP call. We do hope to include a DSL since that is the most natural way of expressing stream processing operations on top of the processor client. The DSL layer should be equivalent to that provided by Spark streaming or Flink in terms of expressiveness though there will be differences in implementation. Our client is intended to be simpler, with minimum external dependencies since it integrates closely with Kafka. This is really what most application development is hoping to get - a lightweight library on top of Kafka that allows them to process streams of data. Thanks Neha On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, Since we will be discussing KIP-28 in the call tomorrow, can you update the KIP with the feature-comparison with existing solutions? I admit that I do not see a need for single-event-producer-consumer pair (AKA Flume Interceptor). I've seen tons of people implement such apps in the last year, and it seemed easy. Now, perhaps we were doing it all wrong... but I'd like to know how :) If we are talking about a bigger story (i.e. DSL, real stream-processing, etc), thats a different discussion. I've seen a bunch of misconceptions about SparkStreaming in this discussion, and I have some thoughts in that regard, but I'd rather not go into that if thats outside the scope of this KIP. Gwen On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com wrote: Hi Ewen, Replies inlined. On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Just some notes on the KIP doc itself: * It'd be useful to clarify at what point the plain consumer + custom code + producer breaks down. I think trivial filtering and aggregation on a single stream usually work fine with this model. Anything where you need more complex joins, windowing, etc. are where it breaks down. I think most interesting applications require that functionality, but it's helpful to make this really clear in the motivation -- right now, Kafka only provides the lowest level plumbing for stream processing applications, so most interesting apps require very heavyweight frameworks. I think for users to efficiently express complex logic like joins windowing, etc, a higher-level programming interface beyond the process() interface would definitely be better, but that does not necessarily require a heavyweight frameworks, which usually includes more than just the high-level functional programming model. I would argue that an alternative solution would better be provided for users who want some high-level programming interface but not a heavyweight stream processing framework that include the processor library plus another DSL layer on top of it. * I think the feature comparison of plain producer/consumer, stream processing frameworks, and this new library is a good start, but we might want something more thorough and structured, like a feature matrix. Right now it's hard to figure out exactly how they relate to each other. Cool, I can do that. * I'd personally push the library vs. framework story very strongly -- the total buy-in and weak integration story of stream processing frameworks is a big downside and makes a library a really compelling (and currently unavailable, as far as I am aware) alternative. Are you suggesting there are still some content missing about the motivations of adding the proposed library in the wiki page? * Comment about in-memory storage of other frameworks is interesting -- it is specific to the framework, but is supposed to also give performance benefits. The high-level functional processing interface would allow for combining multiple operations when there's no shuffle, but when there is a shuffle, we'll always be writing to Kafka, right? Spark (and presumably spark streaming) is supposed to get a big win by handling shuffles such that the data just stays in cache and never actually hits disk, or at least hits disk in the background. Will we take a hit because we always write to Kafka? I agree with Neha's comments here. One more point I want to make is materializing to Kafka is not necessarily much worse than keeping data in memory if the downstream consumption is caught up such that most of the reads will be hitting file cache. I remember Samza has illustrated that under such scenarios its throughput is actually quite comparable to Spark Streaming / Storm. * I really struggled with the structure of the KIP template with Copycat because the flow doesn't work well for proposals like this. They aren't as concrete changes as the KIP template was designed for. I'd completely
Re: Review Request 33620: Patch for KAFKA-1690
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review93110 --- I did an initial pass over the code (excluding tests) and left some comments. Mostly trivial fixes, I hope. build.gradle (lines 247 - 249) https://reviews.apache.org/r/33620/#comment147361 KAFKA-2348 was merged recently and if/else for 2.9 support was removed there. It should not be restored here. clients/src/main/java/org/apache/kafka/clients/ClientUtils.java (lines 80 - 86) https://reviews.apache.org/r/33620/#comment147362 Code convention: no braces for single statement. clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (line 106) https://reviews.apache.org/r/33620/#comment147363 Why not pass `values` to the copy constructor of `HashMap`? Then the implementation is simply `return new HashMapString, Object(values);`. clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java (line 29) https://reviews.apache.org/r/33620/#comment147364 SSL is deprecated (https://ma.ttias.be/rfc-7568-ssl-3-0-is-now-officially-deprecated/), should we be supporting it at all? If we do support it, then we should at least include a warning. clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java (line 37) https://reviews.apache.org/r/33620/#comment147365 Should we support all ciphers by default, or should we be excluding ciphers that are known to be broken? clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java (lines 64 - 66) https://reviews.apache.org/r/33620/#comment147366 Code convention: no braces used for single statement (as can be seen in the `if` a few lines above). clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java (line 34) https://reviews.apache.org/r/33620/#comment147368 Why not create the principal at this point instead of in the `principal()` method? Is it a costly operation? clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java (lines 35 - 39) https://reviews.apache.org/r/33620/#comment147370 As far as I can see, these fields can be final. Also, is it intentional for `transportLayer` to be `public`? clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java (line 27) https://reviews.apache.org/r/33620/#comment147371 Isn't `plaintext` a single word in a security context (i.e. https://en.wikipedia.org/wiki/Plaintext)? This also affects other classes like `PlainTextTransportLayer`, etc. clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java (line 49) https://reviews.apache.org/r/33620/#comment147382 Can be final? clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 46 - 49) https://reviews.apache.org/r/33620/#comment147383 Can some of these be final? clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 461) https://reviews.apache.org/r/33620/#comment147384 Should this be using `channelForId`? core/src/main/scala/kafka/api/FetchResponse.scala (line 82) https://reviews.apache.org/r/33620/#comment147373 Casts are to be avoided in Scala, pattern matching is a better way to do this: `channel match { case tl: TransportLayer = pending = tl.hasPendingWrites case _ = }` However, I see that this pattern is repeated in many classes, which is not good. Assuming that we can't change `Send.writeTo` to take a `TransportLayer` (either for compatibility or because there are implementations that don't use a `TransportLayer`), we should consider introducing a utility method `hasPendingWrites(channel)` that calls `hasPendingWrites` or returns false. What do you think? core/src/main/scala/kafka/network/SocketServer.scala (lines 52 - 63) https://reviews.apache.org/r/33620/#comment147375 Are all these vals needed at class scope? core/src/main/scala/kafka/network/SocketServer.scala (line 116) https://reviews.apache.org/r/33620/#comment147374 One typically does not specify the type of the parameter for a simple function, i.e.: `requestChannel.addResponseListener(id = processors(id).wakeup())` It's also possible to use underscore syntax to avoid declaring `id` but the name could aid readability in some cases. core/src/main/scala/kafka/network/SocketServer.scala (line 232) https://reviews.apache.org/r/33620/#comment147377 Do we really have to populate slices of the `processors` array in each `Acceptor`? Would it not be better to create the full array and then pass it to `Acceptor`? Or even perhaps pass the relevant slice. core/src/main/scala/kafka/network/SocketServer.scala (line 501) https://reviews.apache.org/r/33620/#comment147379 Code convention: space after `:`
Best practices - Using kafka (with http server) as source-of-truth
Hi Folks, I would like to understand the best practices when using kafka as the source-of-truth, given the fact that I want to pump in data to Kafka using http methods. What are the current production configurations for such a use case:- 1. Kafka-http-client - is it scalable the way Nginx is ?? 2. Using Kafka and Nginx together - If anybody has used this, please explain 3. Any other scalable method ? Regards, prabcs
Re: Review Request 33620: Patch for KAFKA-1690
On July 27, 2015, 1:32 p.m., Ismael Juma wrote: core/src/main/scala/kafka/api/FetchResponse.scala, line 82 https://reviews.apache.org/r/33620/diff/13/?file=1021998#file1021998line82 Casts are to be avoided in Scala, pattern matching is a better way to do this: `channel match { case tl: TransportLayer = pending = tl.hasPendingWrites case _ = }` However, I see that this pattern is repeated in many classes, which is not good. Assuming that we can't change `Send.writeTo` to take a `TransportLayer` (either for compatibility or because there are implementations that don't use a `TransportLayer`), we should consider introducing a utility method `hasPendingWrites(channel)` that calls `hasPendingWrites` or returns false. What do you think? Sriharsha Chintalapani wrote: The reason for doing this. Here Channel can be GatheringByteChannel or some other socketchannel. As I mentioned in one of comments , when KafkaChannel used across the system especially when we deprecate/remove BlockingChannel we can go ahead and call channel.hasPendingWrites. That makes sense. Still, having a utility method would make it easier to make that change when that time comes, right? - Ismael --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review93110 --- On July 25, 2015, 7:11 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 25, 2015, 7:11 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. KAFKA-1690. added staged receives to selector. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Diffs - build.gradle 0abec26fb2d7be62c8a673f9ec838e926e64b2d1 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc
[jira] [Commented] (KAFKA-1682) Security for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642865#comment-14642865 ] Sriharsha Chintalapani commented on KAFKA-1682: --- [~ijuma] Here it is https://issues.apache.org/jira/browse/KAFKA-2162 Security for Kafka -- Key: KAFKA-1682 URL: https://issues.apache.org/jira/browse/KAFKA-1682 Project: Kafka Issue Type: New Feature Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Parent ticket for security. Wiki and discussion is here: https://cwiki.apache.org/confluence/display/KAFKA/Security -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2162) Kafka Auditing functionality
[ https://issues.apache.org/jira/browse/KAFKA-2162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-2162: -- Issue Type: Sub-task (was: Bug) Parent: KAFKA-1682 Kafka Auditing functionality Key: KAFKA-2162 URL: https://issues.apache.org/jira/browse/KAFKA-2162 Project: Kafka Issue Type: Sub-task Reporter: Sriharsha Chintalapani Assignee: Parth Brahmbhatt During Kafka authorization discussion thread . There was concerns raised about not having Auditing. Auditing is important functionality but its not part of authorizer. This jira will track adding audit functionality to kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1682) Security for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642850#comment-14642850 ] Ismael Juma commented on KAFKA-1682: One of the in-scope items in the wiki page is Auditing. Is that information up to date and if so, is there a ticket for it? All the other items seem to be covered by the subtasks associated to this ticket. Security for Kafka -- Key: KAFKA-1682 URL: https://issues.apache.org/jira/browse/KAFKA-1682 Project: Kafka Issue Type: New Feature Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Parent ticket for security. Wiki and discussion is here: https://cwiki.apache.org/confluence/display/KAFKA/Security -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-1686) Implement SASL/Kerberos
[ https://issues.apache.org/jira/browse/KAFKA-1686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-1686 started by Sriharsha Chintalapani. - Implement SASL/Kerberos --- Key: KAFKA-1686 URL: https://issues.apache.org/jira/browse/KAFKA-1686 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Fix For: 0.9.0 Implement SASL/Kerberos authentication. To do this we will need to introduce a new SASLRequest and SASLResponse pair to the client protocol. This request and response will each have only a single byte[] field and will be used to handle the SASL challenge/response cycle. Doing this will initialize the SaslServer instance and associate it with the session in a manner similar to KAFKA-1684. When using integrity or encryption mechanisms with SASL we will need to wrap and unwrap bytes as in KAFKA-1684 so the same interface that covers the SSLEngine will need to also cover the SaslServer instance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33620: Patch for KAFKA-1690
On July 27, 2015, 1:32 p.m., Ismael Juma wrote: core/src/main/scala/kafka/api/FetchResponse.scala, line 82 https://reviews.apache.org/r/33620/diff/13/?file=1021998#file1021998line82 Casts are to be avoided in Scala, pattern matching is a better way to do this: `channel match { case tl: TransportLayer = pending = tl.hasPendingWrites case _ = }` However, I see that this pattern is repeated in many classes, which is not good. Assuming that we can't change `Send.writeTo` to take a `TransportLayer` (either for compatibility or because there are implementations that don't use a `TransportLayer`), we should consider introducing a utility method `hasPendingWrites(channel)` that calls `hasPendingWrites` or returns false. What do you think? The reason for doing this. Here Channel can be GatheringByteChannel or some other socketchannel. As I mentioned in one of comments , when KafkaChannel used across the system especially when we deprecate/remove BlockingChannel we can go ahead and call channel.hasPendingWrites. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review93110 --- On July 25, 2015, 7:11 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 25, 2015, 7:11 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. KAFKA-1690. added staged receives to selector. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Diffs - build.gradle 0abec26fb2d7be62c8a673f9ec838e926e64b2d1 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642821#comment-14642821 ] Rajasekar Elango commented on KAFKA-1690: - [~harsha_ch] Is there any documentation on how to use SSL feature? E.g How to enable SSL on broker? how to make producer/consumer talk to broker via SSL? new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642829#comment-14642829 ] Sriharsha Chintalapani commented on KAFKA-1690: --- [~erajasekar] I am putting together instructions on how to enable SSL. Will post the details on wiki in a day or two. new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1682) Security for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642868#comment-14642868 ] Ismael Juma commented on KAFKA-1682: [~harsha_ch], thanks for the link and for making KAFKA-2162 a sub-task of this ticket. Security for Kafka -- Key: KAFKA-1682 URL: https://issues.apache.org/jira/browse/KAFKA-1682 Project: Kafka Issue Type: New Feature Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Parent ticket for security. Wiki and discussion is here: https://cwiki.apache.org/confluence/display/KAFKA/Security -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-28 - Add a transform client for data processing
Hey Yi, Great points. I think for some of this the most useful thing would be to get a wip prototype out that we could discuss concretely. I think Yasuhiro and Guozhang took that prototype I had done, and had some improvements. Give us a bit to get that into understandable shape so we can discuss. To address a few of your other points: 1. Yeah we are going to try to generalize the partition management stuff. We'll get a wiki/JIRA up for that. I think that gives what you want in terms of moving partitioning to the client side. 2. I think consuming from a different cluster you produce to will be easy. More than that is more complex, though I agree the pluggable partitioning makes it theoretically possible. Let's try to get something that works for the first case, it sounds like that solves the use case you describe of wanting to directly transform from a given cluster but produce back to a different cluster. I think the key observation is that the whole reason LinkedIn split data over clusters to begin with was because of the lack of quotas, which are in any case getting implemented. -Jay On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Jay and all, Thanks for all your quick responses. I tried to summarize my thoughts here: - ConsumerRecord as stream processor API: * This KafkaProcessor API is targeted to receive the message from Kafka. So, to Yasuhiro's join/transformation example, any join/transformation results that are materialized in Kafka should have ConsumerRecord format (i.e. w/ topic and offsets). Any non-materialized join/transformation results should not be processed by this KafkaProcessor API. One example is the in-memory operators API in Samza, which is designed to handle the non-materialzied join/transformation results. And yes, in this case, a more abstract data model is needed. * Just to support Jay's point of a general ConsumerRecord/ProducerRecord, a general stream processing on more than one data sources would need at least the following info: data source description (i.e. which topic/table), and actual data (i.e. key-value pairs). It would make sense to have the data source name as part of the general metadata in stream processing (think about it as the table name for records in standard SQL). - SQL/DSL * I think that this topic itself is worthy of another KIP discussion. I would prefer to leave it out of scope in KIP-28. - Client-side pluggable partition manager * Given the use cases we have seen with large-scale deployment of Samza/Kafka in LinkedIn, I would argue that we should make it as the first-class citizen in this KIP. The use cases include: * multi-cluster Kafka * host-affinity (i.e. local-state associated w/ certain partitions on client) - Multi-cluster scenario * Although I originally just brought it up as a use case that requires client-side partition manager, reading Jay’s comments, I realized that I have one fundamental issue w/ the current copycat + transformation model. If I interpret Jay’s comment correctly, the proposed copycat+transformation plays out in the following way: i) copycat takes all data from sources (no matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii) transformation is only restricted to take data sources in *this single Kafka cluster* to perform aggregate/join etc. This is different from my original understanding of the copycat. The main issue I have with this model is: huge data-copy between Kafka clusters. In LinkedIn, we used to follow this model that uses MirrorMaker to map topics from tracking clusters to Samza-specific Kafka cluster and only do stream processing in the Samza-specific Kafka cluster. We moved away from this model and started allowing users to directly consume from tracking Kafka clusters due to the overhead of copying huge amount of traffic between Kafka clusters. I agree that the initial design of KIP-28 would probably need a smaller scope of problem to solve, hence, limiting to solving partition management in a single cluster. However, I would really hope the design won’t prevent the use case of processing data directly from multiple clusters. In my opinion, making the partition manager as a client-side pluggable logic would allow us to achieve these goals. Thanks a lot in advance! -Yi On Fri, Jul 24, 2015 at 11:13 AM, Jay Kreps j...@confluent.io wrote: Hey Yi, For your other two points: - This definitely doesn't cover any kind of SQL or anything like this. - The prototype we started with just had process() as a method but Yasuhiro had some ideas of adding additional filter/aggregate convenience methods. We should discuss how this would fit with the operator work you were doing in Samza. Probably the best way is just get the code out there in current state and start talking about it? - Your point about multiple clusters. We actually have a proposed extension
[jira] [Commented] (KAFKA-2365) Copycat checklist
[ https://issues.apache.org/jira/browse/KAFKA-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643016#comment-14643016 ] Gwen Shapira commented on KAFKA-2365: - BTW. Two connectors that appeared in the KIP discussion but are not in the JIRA are JDBC and HDFS. Is the idea to leave them out for now? Copycat checklist - Key: KAFKA-2365 URL: https://issues.apache.org/jira/browse/KAFKA-2365 Project: Kafka Issue Type: New Feature Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Labels: feature Fix For: 0.8.3 This covers the development plan for [KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767]. There are a number of features that can be developed in sequence to make incremental progress, and often in parallel: * Initial patch - connector API and core implementation * Runtime data API * Standalone CLI * REST API * Distributed copycat - CLI * Distributed copycat - coordinator * Distributed copycat - config storage * Distributed copycat - offset storage * Log/file connector (sample source/sink connector) * Elasticsearch sink connector (sample sink connector for full log - Kafka - Elasticsearch sample pipeline) * Copycat metrics * System tests (including connector tests) * Mirrormaker connector * Copycat documentation This is an initial list, but it might need refinement to allow for more incremental progress and may be missing features we find we want before the initial release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2321) Introduce CONTRIBUTING.md
[ https://issues.apache.org/jira/browse/KAFKA-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2321: - Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) Committed to trunk Introduce CONTRIBUTING.md - Key: KAFKA-2321 URL: https://issues.apache.org/jira/browse/KAFKA-2321 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Fix For: 0.8.3 This file is displayed when people create a pull request in GitHub. It should link to the relevant pages in the wiki and website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2321) Introduce CONTRIBUTING.md
[ https://issues.apache.org/jira/browse/KAFKA-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643107#comment-14643107 ] ASF GitHub Bot commented on KAFKA-2321: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/97 Introduce CONTRIBUTING.md - Key: KAFKA-2321 URL: https://issues.apache.org/jira/browse/KAFKA-2321 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Fix For: 0.8.3 This file is displayed when people create a pull request in GitHub. It should link to the relevant pages in the wiki and website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2349) `contributing` website page should link to Contributing Code Changes wiki page
[ https://issues.apache.org/jira/browse/KAFKA-2349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643129#comment-14643129 ] Guozhang Wang commented on KAFKA-2349: -- Committed to the repo, thanks! `contributing` website page should link to Contributing Code Changes wiki page Key: KAFKA-2349 URL: https://issues.apache.org/jira/browse/KAFKA-2349 Project: Kafka Issue Type: Task Reporter: Ismael Juma Assignee: Ismael Juma Attachments: KAFKA-2349.patch This should be merged at the same time as https://issues.apache.org/jira/browse/KAFKA-2321 and only after a vote takes place in the mailing list. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2367) Add Copycat runtime data API
[ https://issues.apache.org/jira/browse/KAFKA-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2367: - Summary: Add Copycat runtime data API (was: Add Copycat runtime API) Add Copycat runtime data API Key: KAFKA-2367 URL: https://issues.apache.org/jira/browse/KAFKA-2367 Project: Kafka Issue Type: Sub-task Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Design the API used for runtime data in Copycat. This API is used to construct schemas and records that Copycat processes. This needs to be a fairly general data model (think Avro, JSON, Protobufs, Thrift) in order to support complex, varied data types that may be input from/output to many data systems. This should issue should also address the serialization interfaces used within Copycat, which translate the runtime data into serialized byte[] form. It is important that these be considered together because the data format can be used in multiple ways (records, partition IDs, partition offsets), so it and the corresponding serializers must be sufficient for all these use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: New Producer and acks configuration
I think there is still a subtle difference between async with acks = 0 and async with callback, that when the #.max-inflight-requests has reached the subsequent requests cannot be sent until previous responses are returned (which could happen, for example, when the broker is slow / network issue happens) in the second case but not in the first. Given this difference, I feel there are still scenarios, though probably rare, that users would like to use acks = 0 even with new producer's callbacks. Guozhang On Mon, Jul 27, 2015 at 9:25 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So basically this means that with acks = 0, their is no guarantee that the message has been received by Kafka broker. I am just wondering, why would anyone be using acks = 0, since anyone using kafka and doing producer.send() would want that, their message got to kafka brokers. Also as Jay said, with new producer with async mode, clients will not have to wait for the response since it will be handled in callbacks. So the use of acks = 0 sounds very rare to me and I am not able to think of an usecase around it. Thanks, Mayuresh On Sun, Jul 26, 2015 at 2:40 PM, Gwen Shapira gshap...@cloudera.com wrote: Aha! Yes, I was missing the part with the dummy response. Thank you! Gwen On Sun, Jul 26, 2015 at 2:17 PM, Ewen Cheslack-Postava e...@confluent.io wrote: It's different because it changes whether the client waits for the response from the broker at all. Take a look at NetworkClient.handleCompletedSends, which fills in dummy responses when a response is not expected (and that flag gets set via Sender.produceRequest using acks != 0 as a flag to ClientRequest). This means that the producer will invoke the callback resolve the future as soon as the request hits the TCP buffer on the client. At that point, the behavior of the broker wrt generating a response doesn't matter -- the client isn't waiting on that response anyway. This definitely is faster since you aren't waiting for the round trip, but it seems like it is of questionable value with the new producer as Jay explained. It is slightly better than just assuming records have been sent as soon as you call Producer.send() in this shouldn't trigger a callback until the records have made it through the internal KafkaProducer buffering. But since it still has to make it through the TCP buffers it doesn't really guarantee anything that useful. -Ewen On Sun, Jul 26, 2015 at 1:40 PM, Gwen Shapira gshap...@cloudera.com wrote: What bugs me is that even with acks = 0, the broker will append to local log before responding (unless I'm misreading the code), so I don't see why a client with acks = 0 will be any faster. Unless the client chooses to not wait for response, which is orthogonal to acks parameter. On Mon, Jul 20, 2015 at 8:52 AM, Jay Kreps j...@confluent.io wrote: acks=0 is a one-way send, the client doesn't need to wait on the response. Whether this is useful sort of depends on the client implementation. The new java producer does all sends async so waiting on a response isn't really a thing. For a client that lacks this, though, as some of them do, acks=0 will be a lot faster. It also makes some sense in terms of what is completed when the request is considered satisfied acks = 0 - message is written to the network (buffer) acks = 1 - message is written to the leader log acks = -1 - message is committed -Jay On Sat, Jul 18, 2015 at 10:50 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I was looking into the different between acks = 0 and acks = 1 in the new producer, and was a bit surprised at what I found. Basically, if I understand correctly, the only difference is that with acks = 0, if the leader fails to append locally, it closes the network connection silently and with acks = 1, it sends an actual error message. Which seems to mean that with acks = 0, any failed produce will lead to metadata refresh and a retry (because network error), while acks = 1 will lead to either retries or abort, depending on the error. Not only this doesn't match the documentation, it doesn't even make much sense... acks = 0 was supposed to somehow makes things less safe but faster, and it doesn't seem to be doing that any more. I'm not even sure there's any case where the acks = 0 behavior is desirable. Is it my misunderstanding, or did we somehow screw up the logic here? Gwen -- Thanks, Ewen -- -Regards, Mayuresh R. Gharat (862) 250-7125 -- -- Guozhang
[jira] [Updated] (KAFKA-2368) Add Copycat standalone CLI
[ https://issues.apache.org/jira/browse/KAFKA-2368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2368: - Component/s: copycat Add Copycat standalone CLI -- Key: KAFKA-2368 URL: https://issues.apache.org/jira/browse/KAFKA-2368 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Define and add the CLI for running Copycat in standalone mode. KAFKA-2366 may add a simple version, but this JIRA should address agreeing on the final interface for the functionality defined in KIP-26, which proposed a basic API but may need further extension. Since the standalone mode is not meant to be persistent, this may end up being a pretty straightforward API. However, it should cleanly handle both worker-level configs and multiple connector configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2366) Initial patch for Copycat
[ https://issues.apache.org/jira/browse/KAFKA-2366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2366: - Component/s: copycat Initial patch for Copycat - Key: KAFKA-2366 URL: https://issues.apache.org/jira/browse/KAFKA-2366 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 This covers the initial patch for Copycat. The goal here is to get some baseline code in place, not necessarily the finalized implementation. The key thing we'll want here is the connector/task API, which defines how third parties write connectors. Beyond that the goal is to have a basically functional standalone Copycat implementation -- enough that we can run and test any connector code with reasonable coverage of functionality; specifically, it's important that core concepts like offset commit and resuming connector tasks function properly. These two things obviously interact, so development of the standalone worker may affect the design of connector APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2376) Add Copycat metrics
[ https://issues.apache.org/jira/browse/KAFKA-2376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2376: - Component/s: copycat Add Copycat metrics --- Key: KAFKA-2376 URL: https://issues.apache.org/jira/browse/KAFKA-2376 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Fix For: 0.8.3 Copycat needs good metrics for monitoring since that will be the primary insight into the health of connectors as they copy data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2373) Copycat distributed offset storage
[ https://issues.apache.org/jira/browse/KAFKA-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2373: - Component/s: copycat Copycat distributed offset storage -- Key: KAFKA-2373 URL: https://issues.apache.org/jira/browse/KAFKA-2373 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Add offset storage for Copycat that works in distributed mode, which likely means storing the data in a Kafka topic. Copycat workers will use this by default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: New Producer and acks configuration
So basically this means that with acks = 0, their is no guarantee that the message has been received by Kafka broker. I am just wondering, why would anyone be using acks = 0, since anyone using kafka and doing producer.send() would want that, their message got to kafka brokers. Also as Jay said, with new producer with async mode, clients will not have to wait for the response since it will be handled in callbacks. So the use of acks = 0 sounds very rare to me and I am not able to think of an usecase around it. Thanks, Mayuresh On Sun, Jul 26, 2015 at 2:40 PM, Gwen Shapira gshap...@cloudera.com wrote: Aha! Yes, I was missing the part with the dummy response. Thank you! Gwen On Sun, Jul 26, 2015 at 2:17 PM, Ewen Cheslack-Postava e...@confluent.io wrote: It's different because it changes whether the client waits for the response from the broker at all. Take a look at NetworkClient.handleCompletedSends, which fills in dummy responses when a response is not expected (and that flag gets set via Sender.produceRequest using acks != 0 as a flag to ClientRequest). This means that the producer will invoke the callback resolve the future as soon as the request hits the TCP buffer on the client. At that point, the behavior of the broker wrt generating a response doesn't matter -- the client isn't waiting on that response anyway. This definitely is faster since you aren't waiting for the round trip, but it seems like it is of questionable value with the new producer as Jay explained. It is slightly better than just assuming records have been sent as soon as you call Producer.send() in this shouldn't trigger a callback until the records have made it through the internal KafkaProducer buffering. But since it still has to make it through the TCP buffers it doesn't really guarantee anything that useful. -Ewen On Sun, Jul 26, 2015 at 1:40 PM, Gwen Shapira gshap...@cloudera.com wrote: What bugs me is that even with acks = 0, the broker will append to local log before responding (unless I'm misreading the code), so I don't see why a client with acks = 0 will be any faster. Unless the client chooses to not wait for response, which is orthogonal to acks parameter. On Mon, Jul 20, 2015 at 8:52 AM, Jay Kreps j...@confluent.io wrote: acks=0 is a one-way send, the client doesn't need to wait on the response. Whether this is useful sort of depends on the client implementation. The new java producer does all sends async so waiting on a response isn't really a thing. For a client that lacks this, though, as some of them do, acks=0 will be a lot faster. It also makes some sense in terms of what is completed when the request is considered satisfied acks = 0 - message is written to the network (buffer) acks = 1 - message is written to the leader log acks = -1 - message is committed -Jay On Sat, Jul 18, 2015 at 10:50 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I was looking into the different between acks = 0 and acks = 1 in the new producer, and was a bit surprised at what I found. Basically, if I understand correctly, the only difference is that with acks = 0, if the leader fails to append locally, it closes the network connection silently and with acks = 1, it sends an actual error message. Which seems to mean that with acks = 0, any failed produce will lead to metadata refresh and a retry (because network error), while acks = 1 will lead to either retries or abort, depending on the error. Not only this doesn't match the documentation, it doesn't even make much sense... acks = 0 was supposed to somehow makes things less safe but faster, and it doesn't seem to be doing that any more. I'm not even sure there's any case where the acks = 0 behavior is desirable. Is it my misunderstanding, or did we somehow screw up the logic here? Gwen -- Thanks, Ewen -- -Regards, Mayuresh R. Gharat (862) 250-7125
Re: New Producer and acks configuration
Yeah, using acks=0 should result in higher throughput since we are not limited by the roundtrip time to the broker. Btw. regarding in-flight requests: With acks = 1 (or -1), can we send a message batch to a partition before the brokers acked a previous request? Doesn't it risk getting messages out of order? On Mon, Jul 27, 2015 at 9:41 AM, Guozhang Wang wangg...@gmail.com wrote: I think there is still a subtle difference between async with acks = 0 and async with callback, that when the #.max-inflight-requests has reached the subsequent requests cannot be sent until previous responses are returned (which could happen, for example, when the broker is slow / network issue happens) in the second case but not in the first. Given this difference, I feel there are still scenarios, though probably rare, that users would like to use acks = 0 even with new producer's callbacks. Guozhang On Mon, Jul 27, 2015 at 9:25 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So basically this means that with acks = 0, their is no guarantee that the message has been received by Kafka broker. I am just wondering, why would anyone be using acks = 0, since anyone using kafka and doing producer.send() would want that, their message got to kafka brokers. Also as Jay said, with new producer with async mode, clients will not have to wait for the response since it will be handled in callbacks. So the use of acks = 0 sounds very rare to me and I am not able to think of an usecase around it. Thanks, Mayuresh On Sun, Jul 26, 2015 at 2:40 PM, Gwen Shapira gshap...@cloudera.com wrote: Aha! Yes, I was missing the part with the dummy response. Thank you! Gwen On Sun, Jul 26, 2015 at 2:17 PM, Ewen Cheslack-Postava e...@confluent.io wrote: It's different because it changes whether the client waits for the response from the broker at all. Take a look at NetworkClient.handleCompletedSends, which fills in dummy responses when a response is not expected (and that flag gets set via Sender.produceRequest using acks != 0 as a flag to ClientRequest). This means that the producer will invoke the callback resolve the future as soon as the request hits the TCP buffer on the client. At that point, the behavior of the broker wrt generating a response doesn't matter -- the client isn't waiting on that response anyway. This definitely is faster since you aren't waiting for the round trip, but it seems like it is of questionable value with the new producer as Jay explained. It is slightly better than just assuming records have been sent as soon as you call Producer.send() in this shouldn't trigger a callback until the records have made it through the internal KafkaProducer buffering. But since it still has to make it through the TCP buffers it doesn't really guarantee anything that useful. -Ewen On Sun, Jul 26, 2015 at 1:40 PM, Gwen Shapira gshap...@cloudera.com wrote: What bugs me is that even with acks = 0, the broker will append to local log before responding (unless I'm misreading the code), so I don't see why a client with acks = 0 will be any faster. Unless the client chooses to not wait for response, which is orthogonal to acks parameter. On Mon, Jul 20, 2015 at 8:52 AM, Jay Kreps j...@confluent.io wrote: acks=0 is a one-way send, the client doesn't need to wait on the response. Whether this is useful sort of depends on the client implementation. The new java producer does all sends async so waiting on a response isn't really a thing. For a client that lacks this, though, as some of them do, acks=0 will be a lot faster. It also makes some sense in terms of what is completed when the request is considered satisfied acks = 0 - message is written to the network (buffer) acks = 1 - message is written to the leader log acks = -1 - message is committed -Jay On Sat, Jul 18, 2015 at 10:50 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I was looking into the different between acks = 0 and acks = 1 in the new producer, and was a bit surprised at what I found. Basically, if I understand correctly, the only difference is that with acks = 0, if the leader fails to append locally, it closes the network connection silently and with acks = 1, it sends an actual error message. Which seems to mean that with acks = 0, any failed produce will lead to metadata refresh and a retry (because network error), while acks = 1 will lead to either retries or abort, depending on the error. Not only this doesn't match the documentation, it doesn't even make much sense... acks = 0 was supposed to somehow makes things less safe but faster, and it doesn't seem to be doing that any more. I'm not even sure there's any case where the acks = 0 behavior is
[jira] [Updated] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-2350: --- Description: There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. was: There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2365) Copycat checklist
[ https://issues.apache.org/jira/browse/KAFKA-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643087#comment-14643087 ] Neha Narkhede commented on KAFKA-2365: -- Worth discussing a process for including a connector in Kafka core, but I think we went through this in the KIP discussion and here is what I think. To keep packaging, review and code management easier, it is better to just include a couple lightweight connectors enough to show the usage of the copypcat APIs (file in/file out). Any connector that requires depending on an external system (HDFS, Elasticsearch) should really live elsewhere. We should also delete the ones under contrib, they never ended up getting supported by the community. Since there will always be connectors that need to live outside Kafka, I think we should instead focus on how to make tooling easier for discovering and using these federated connectors. Copycat checklist - Key: KAFKA-2365 URL: https://issues.apache.org/jira/browse/KAFKA-2365 Project: Kafka Issue Type: New Feature Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Labels: feature Fix For: 0.8.3 This covers the development plan for [KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767]. There are a number of features that can be developed in sequence to make incremental progress, and often in parallel: * Initial patch - connector API and core implementation * Runtime data API * Standalone CLI * REST API * Distributed copycat - CLI * Distributed copycat - coordinator * Distributed copycat - config storage * Distributed copycat - offset storage * Log/file connector (sample source/sink connector) * Elasticsearch sink connector (sample sink connector for full log - Kafka - Elasticsearch sample pipeline) * Copycat metrics * System tests (including connector tests) * Mirrormaker connector * Copycat documentation This is an initial list, but it might need refinement to allow for more incremental progress and may be missing features we find we want before the initial release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2349) `contributing` website page should link to Contributing Code Changes wiki page
[ https://issues.apache.org/jira/browse/KAFKA-2349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2349: - Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) `contributing` website page should link to Contributing Code Changes wiki page Key: KAFKA-2349 URL: https://issues.apache.org/jira/browse/KAFKA-2349 Project: Kafka Issue Type: Task Reporter: Ismael Juma Assignee: Ismael Juma Fix For: 0.8.3 Attachments: KAFKA-2349.patch This should be merged at the same time as https://issues.apache.org/jira/browse/KAFKA-2321 and only after a vote takes place in the mailing list. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2365) Copycat checklist
[ https://issues.apache.org/jira/browse/KAFKA-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643014#comment-14643014 ] Gwen Shapira commented on KAFKA-2365: - I added a component, added you as component lead and made sure all new CopyCat jiras will automatically assign to you. If you object to the auto-assignment, let me know - I figured that at this stage it will save you a click of assigning everything to you... Copycat checklist - Key: KAFKA-2365 URL: https://issues.apache.org/jira/browse/KAFKA-2365 Project: Kafka Issue Type: New Feature Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Labels: feature Fix For: 0.8.3 This covers the development plan for [KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767]. There are a number of features that can be developed in sequence to make incremental progress, and often in parallel: * Initial patch - connector API and core implementation * Runtime data API * Standalone CLI * REST API * Distributed copycat - CLI * Distributed copycat - coordinator * Distributed copycat - config storage * Distributed copycat - offset storage * Log/file connector (sample source/sink connector) * Elasticsearch sink connector (sample sink connector for full log - Kafka - Elasticsearch sample pipeline) * Copycat metrics * System tests (including connector tests) * Mirrormaker connector * Copycat documentation This is an initial list, but it might need refinement to allow for more incremental progress and may be missing features we find we want before the initial release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2371) Add distributed coordinator implementation for Copycat
[ https://issues.apache.org/jira/browse/KAFKA-2371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2371: - Component/s: copycat Add distributed coordinator implementation for Copycat -- Key: KAFKA-2371 URL: https://issues.apache.org/jira/browse/KAFKA-2371 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Copycat needs a Coordinator implementation that handles multiple Workers that automatically manage the distribution of connectors and tasks across them. To start, this implementation should handle any connectors that have been registered via either a CLI or REST interface for starting/stopping connectors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2375) Implement elasticsearch Copycat sink connector
[ https://issues.apache.org/jira/browse/KAFKA-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2375: - Component/s: copycat Implement elasticsearch Copycat sink connector -- Key: KAFKA-2375 URL: https://issues.apache.org/jira/browse/KAFKA-2375 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Fix For: 0.8.3 Implement an elasticsearch sink connector for Copycat. This should send records to elasticsearch with unique document IDs, given appropriate configs to extract IDs from input records. The motivation here is to provide a good end-to-end example with built-in connectors that require minimal dependencies. Because Elasticsearch has a very simple REST API, an elasticsearch connector shouldn't require any extra dependencies and logs - Elasticsearch (in combination with KAFKA-2374) provides a compelling out-of-the-box Copycat use case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2369) Add Copycat REST API
[ https://issues.apache.org/jira/browse/KAFKA-2369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2369: - Component/s: copycat Add Copycat REST API Key: KAFKA-2369 URL: https://issues.apache.org/jira/browse/KAFKA-2369 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Add a REST API for Copycat. At a minimum, for a single worker this should support: * add/remove connector * connector status * task status * worker status In distributed mode this should handle forwarding if necessary, but it may make sense to defer the distributed support for a later JIRA. This will require the addition of new dependencies to support implementing the REST API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2374) Implement Copycat log/file connector
[ https://issues.apache.org/jira/browse/KAFKA-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2374: - Component/s: copycat Implement Copycat log/file connector Key: KAFKA-2374 URL: https://issues.apache.org/jira/browse/KAFKA-2374 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 This is a good baseline connector that has zero dependencies and works well as both a demonstration and a practical use case for standalone mode. Two key features it should ideally support: support multiple files and rolling log files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2370) Add pause/unpause connector support
[ https://issues.apache.org/jira/browse/KAFKA-2370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2370: - Component/s: copycat Add pause/unpause connector support --- Key: KAFKA-2370 URL: https://issues.apache.org/jira/browse/KAFKA-2370 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 It will sometimes be useful to pause/unpause connectors. For example, if you know planned maintenance will occur on the source/destination system, it would make sense to pause and then resume (but not delete and then restore), a connector. This likely requires support in all Coordinator implementations (standalone/distributed) to trigger the events. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2379) Add Copycat documentation
[ https://issues.apache.org/jira/browse/KAFKA-2379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2379: - Component/s: copycat Add Copycat documentation - Key: KAFKA-2379 URL: https://issues.apache.org/jira/browse/KAFKA-2379 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Starting this out pretty broad as it can cover a lot. Some ideas: * Normal intro/readme type stuff * User guide - how to run in standalone/distributed mode. Connector/tasks concepts and what they mean in practice. Fault tolerance offsets. REST interface, Copycat as a service, etc. * Dev guide - connectors/partitions/records/offsets/tasks. All the APIs, specific examples for implementing APIs, resuming from previous offsets, dynamic sets of partitions, how to work with the runtime data API, etc. * System design - KIP-26 + more - why we ended up on the design we did, comparisons to other systems w/ low level details, -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2378) Add Copycat embedded API
[ https://issues.apache.org/jira/browse/KAFKA-2378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2378: - Component/s: copycat Add Copycat embedded API Key: KAFKA-2378 URL: https://issues.apache.org/jira/browse/KAFKA-2378 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Fix For: 0.8.3 Much of the required Copycat API will exist from previous patches since any main() method will need to do very similar operations. However, integrating with any other Java code may require additional API support. For example, one of the use cases when integrating with any stream processing application will require knowing which topics will be written to. We will need to add APIs to expose the topics a registered connector is writing to so they can be consumed by a stream processing task -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2377) Add copycat system tests
[ https://issues.apache.org/jira/browse/KAFKA-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2377: - Component/s: copycat Add copycat system tests Key: KAFKA-2377 URL: https://issues.apache.org/jira/browse/KAFKA-2377 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Fix For: 0.8.3 Add baseline system tests for Copycat, covering both standalone and distributed mode. This should cover basic failure modes and verify at-least-one delivery of data, both from source system - Kafka and Kafka - sink system. This, of course, requires testing the core, built-in connectors provided with Copycat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2372) Copycat distributed config storage
[ https://issues.apache.org/jira/browse/KAFKA-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2372: - Component/s: copycat Copycat distributed config storage -- Key: KAFKA-2372 URL: https://issues.apache.org/jira/browse/KAFKA-2372 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Add a config storage mechanism to Copycat that works in distributed mode. Copycat workers that start in distributed mode should use this implementation by default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: KAFKA-2321; Introduce CONTRIBUTING.md
Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/97 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (KAFKA-2365) Copycat checklist
[ https://issues.apache.org/jira/browse/KAFKA-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2365: - Component/s: copycat Copycat checklist - Key: KAFKA-2365 URL: https://issues.apache.org/jira/browse/KAFKA-2365 Project: Kafka Issue Type: New Feature Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Labels: feature Fix For: 0.8.3 This covers the development plan for [KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767]. There are a number of features that can be developed in sequence to make incremental progress, and often in parallel: * Initial patch - connector API and core implementation * Runtime data API * Standalone CLI * REST API * Distributed copycat - CLI * Distributed copycat - coordinator * Distributed copycat - config storage * Distributed copycat - offset storage * Log/file connector (sample source/sink connector) * Elasticsearch sink connector (sample sink connector for full log - Kafka - Elasticsearch sample pipeline) * Copycat metrics * System tests (including connector tests) * Mirrormaker connector * Copycat documentation This is an initial list, but it might need refinement to allow for more incremental progress and may be missing features we find we want before the initial release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2367) Add Copycat runtime data API
[ https://issues.apache.org/jira/browse/KAFKA-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2367: - Component/s: copycat Add Copycat runtime data API Key: KAFKA-2367 URL: https://issues.apache.org/jira/browse/KAFKA-2367 Project: Kafka Issue Type: Sub-task Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Design the API used for runtime data in Copycat. This API is used to construct schemas and records that Copycat processes. This needs to be a fairly general data model (think Avro, JSON, Protobufs, Thrift) in order to support complex, varied data types that may be input from/output to many data systems. This should issue should also address the serialization interfaces used within Copycat, which translate the runtime data into serialized byte[] form. It is important that these be considered together because the data format can be used in multiple ways (records, partition IDs, partition offsets), so it and the corresponding serializers must be sufficient for all these use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643065#comment-14643065 ] Mayuresh Gharat commented on KAFKA-2260: I think, when 2 producers are trying to produce concurrently, the broker log will be appended in a specific order, which means the corresponding offsets in the respective sub-partitions will also be incremented in that order and the client should be able to figure out from ProduceResponse what is the next offset it needs to send the data to. This is what I understand from the above discussion. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Best practices - Using kafka (with http server) as source-of-truth
Hi Prabhjot, Confluent has a REST proxy with docs that may give some guidance: http://docs.confluent.io/1.0/kafka-rest/docs/intro.html The new producer that it uses is very efficient, so you should be able to get pretty good throughput. You take a bit of a hit due to the overhead of sending data through a proxy, but with appropriate batching you can get about 2/3 the performance as you would get using the Java producer directly. There are also a few other proxies you can find here: https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST You can also put nginx (or HAProxy, or a variety of other solutions) in front of REST proxies for load balancing, HA, SSL termination, etc. This is yet another hop, so it might affect throughput and latency. -Ewen On Mon, Jul 27, 2015 at 6:55 AM, Prabhjot Bharaj prabhbha...@gmail.com wrote: Hi Folks, I would like to understand the best practices when using kafka as the source-of-truth, given the fact that I want to pump in data to Kafka using http methods. What are the current production configurations for such a use case:- 1. Kafka-http-client - is it scalable the way Nginx is ?? 2. Using Kafka and Nginx together - If anybody has used this, please explain 3. Any other scalable method ? Regards, prabcs -- Thanks, Ewen
Re: New Producer and acks configuration
If only we had some sort of system test framework with a producer performance test that we could parameterize with the different acks settings to validate these performance differences... wrt out of order: yes, with 1 in flight requests with retries, messages can get out of order. Becket had a great presentation addressing that and a bunch of other issues with no data loss pipelines: http://www.slideshare.net/JiangjieQin/no-data-loss-pipeline-with-apache-kafka-49753844 Short version: as things are today, you have to *really* understand the producer settings, and some producer internals, to get the exact behavior you want. On Mon, Jul 27, 2015 at 9:44 AM, Gwen Shapira gshap...@cloudera.com wrote: Yeah, using acks=0 should result in higher throughput since we are not limited by the roundtrip time to the broker. Btw. regarding in-flight requests: With acks = 1 (or -1), can we send a message batch to a partition before the brokers acked a previous request? Doesn't it risk getting messages out of order? On Mon, Jul 27, 2015 at 9:41 AM, Guozhang Wang wangg...@gmail.com wrote: I think there is still a subtle difference between async with acks = 0 and async with callback, that when the #.max-inflight-requests has reached the subsequent requests cannot be sent until previous responses are returned (which could happen, for example, when the broker is slow / network issue happens) in the second case but not in the first. Given this difference, I feel there are still scenarios, though probably rare, that users would like to use acks = 0 even with new producer's callbacks. Guozhang On Mon, Jul 27, 2015 at 9:25 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So basically this means that with acks = 0, their is no guarantee that the message has been received by Kafka broker. I am just wondering, why would anyone be using acks = 0, since anyone using kafka and doing producer.send() would want that, their message got to kafka brokers. Also as Jay said, with new producer with async mode, clients will not have to wait for the response since it will be handled in callbacks. So the use of acks = 0 sounds very rare to me and I am not able to think of an usecase around it. Thanks, Mayuresh On Sun, Jul 26, 2015 at 2:40 PM, Gwen Shapira gshap...@cloudera.com wrote: Aha! Yes, I was missing the part with the dummy response. Thank you! Gwen On Sun, Jul 26, 2015 at 2:17 PM, Ewen Cheslack-Postava e...@confluent.io wrote: It's different because it changes whether the client waits for the response from the broker at all. Take a look at NetworkClient.handleCompletedSends, which fills in dummy responses when a response is not expected (and that flag gets set via Sender.produceRequest using acks != 0 as a flag to ClientRequest). This means that the producer will invoke the callback resolve the future as soon as the request hits the TCP buffer on the client. At that point, the behavior of the broker wrt generating a response doesn't matter -- the client isn't waiting on that response anyway. This definitely is faster since you aren't waiting for the round trip, but it seems like it is of questionable value with the new producer as Jay explained. It is slightly better than just assuming records have been sent as soon as you call Producer.send() in this shouldn't trigger a callback until the records have made it through the internal KafkaProducer buffering. But since it still has to make it through the TCP buffers it doesn't really guarantee anything that useful. -Ewen On Sun, Jul 26, 2015 at 1:40 PM, Gwen Shapira gshap...@cloudera.com wrote: What bugs me is that even with acks = 0, the broker will append to local log before responding (unless I'm misreading the code), so I don't see why a client with acks = 0 will be any faster. Unless the client chooses to not wait for response, which is orthogonal to acks parameter. On Mon, Jul 20, 2015 at 8:52 AM, Jay Kreps j...@confluent.io wrote: acks=0 is a one-way send, the client doesn't need to wait on the response. Whether this is useful sort of depends on the client implementation. The new java producer does all sends async so waiting on a response isn't really a thing. For a client that lacks this, though, as some of them do, acks=0 will be a lot faster. It also makes some sense in terms of what is completed when the request is considered satisfied acks = 0 - message is written to the network (buffer) acks = 1 - message is written to the leader log acks = -1 - message is committed -Jay On Sat, Jul 18, 2015 at 10:50 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I was looking into the different between
[jira] [Commented] (KAFKA-2303) Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction failures
[ https://issues.apache.org/jira/browse/KAFKA-2303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643136#comment-14643136 ] Alexander Demidko commented on KAFKA-2303: -- I think in our case we had too many unique keys per partition, so making compactions to happen more frequently will not ultimately solve the issue. Increasing partitions number should help, but it requires more careful planning about the compacted topic overall data volumes. Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction failures Key: KAFKA-2303 URL: https://issues.apache.org/jira/browse/KAFKA-2303 Project: Kafka Issue Type: Bug Components: core, log Affects Versions: 0.8.2.1 Reporter: Alexander Demidko Assignee: Jay Kreps Fix For: 0.8.3 We have rolled out the patch for KAFKA-2235 to our kafka cluster, and recently instead of {code} kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to java.lang.IllegalArgumentException: requirement failed: Attempt to add a new entry to a full offset map. {code} we started to see {code} kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to java.lang.IllegalArgumentException: requirement failed: 131390902 messages in segment topic-name-cgstate-8/79840768.log but offset map can fit only 80530612. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads {code} So, we had to roll it back to avoid disk depletion although I'm not sure if it needs to be rolled back in trunk. This patch applies more strict checks than were in place before: even if there is only one unique key for a segment, cleanup will fail if this segment is too big. Does it make sense to eliminate a limit for the offset map slots count, for example to use an offset map backed by a memory mapped file? The limit of 80530612 slots comes from memory / bytesPerEntry, where memory is Int.MaxValue (we use only one cleaner thread) and bytesPerEntry is 8 + digest hash size. Might be wrong, but it seems if the overall number of unique keys per partition is more than 80M slots in an OffsetMap, compaction will always fail and cleaner thread will die. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Partitioning in Kafka
If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote: For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on the partitions it consumes and write them to a separate topic. For example, if you are writing log messages to a logs topic with the hostname as the key, you could this partitioning strategy to compute message counts for each host in each partition and write them to a log-counts topic. Then a consumer of the log-counts topic would compute total aggregates based on the two intermediate aggregates. The benefit is that you are generally going to get better load balancing across partitions than if you used the default partitioner. (Please correct me if my understanding is incorrect, Gianmarco) So I think the question is whether this is a useful primitive for Kafka to provide out of the box? I was a little concerned that this use case is a little esoteric for a core feature, but it may make more sense in the context of KIP-28 which would provide some higher-level processing capabilities (though it doesn't seem like the KStream abstraction would provide a direct way to leverage this partitioner without custom logic). Thanks, Jason On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some stages of processing on top of it to make proper use of the data, i.e., it envisions Kafka as a substrate for stream processing, and not only as the I/O component. Is this a direction that Kafka wants to go towards? Or is this a role better left to the internal communication systems of other stream processing engines (e.g., Storm)? And if the answer is the latter, how would something such a Samza (which relies mostly on Kafka as its communication substrate) be able to implement advanced partitioning schemes? Cheers, -- Gianmarco
Jenkins build is back to normal : KafkaPreCommit #164
See https://builds.apache.org/job/KafkaPreCommit/164/changes
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643249#comment-14643249 ] Guozhang Wang commented on KAFKA-2350: -- [~becket_qin], I was not considering the implementation from a developer point of view regarding call trace, but rather from a user point of view. That is, with two different names it is more clear about the distinction between topic / partition subscriptions. For another example, say you write some applications with a consumer client embedded, and very likely call its function from many places in your app code / classes. When you saw an exception thrown from unsubscribe(partition), you need to possibly look at other places and check if it is case 1) you used subscribe(topic), but it is not assigned from Kafka, 2) you used subscribe(partition) on some other partitions, but you did not subscribe this before. Similarly, if you saw an exception thrown on your subscribe(partition) call, you need to check if 1) you called poll() in between so that partition is no longer assigned; 2) you called subscribe(topic) before and that partition is not one of the assigned partitions. I.e., you as the developer needs to check what consumer function calls it has done (the trace) before in order to trouble-shoot, when the code is not necessarily written by yourself. With pause / resume, it will be more clear that the consumer is certainly using topic subscriptions, and the partition is no longer assigned to you if those functions throw an exception because of a rebalance, etc. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33620: Patch for KAFKA-1690
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review93177 --- clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (line 108) https://reviews.apache.org/r/33620/#comment147469 Do you mean return copy? - Dong Lin On July 25, 2015, 7:11 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated July 25, 2015, 7:11 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1 KAFKA-1690. Broker side ssl changes. KAFKA-1684. SSL for socketServer. KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL. Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Post merge fixes. KAFKA-1690. Added SSLProducerSendTest. KAFKA-1690. Minor fixes based on patch review comments. Merge commit KAFKA-1690. Added SSL Consumer Test. KAFKA-1690. SSL Support. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Merge branch 'trunk' into KAFKA-1690-V1 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer. KAFKA-1690. Addressing reviews. KAFKA-1690. added staged receives to selector. KAFKA-1690. Addressing reviews. Merge branch 'trunk' into KAFKA-1690-V1 Diffs - build.gradle 0abec26fb2d7be62c8a673f9ec838e926e64b2d1 checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java PRE-CREATION
Re: Review Request 36652: Patch for KAFKA-2351
On July 24, 2015, 4:13 p.m., Jun Rao wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 264 https://reviews.apache.org/r/36652/diff/3/?file=1020607#file1020607line264 Not sure if it's better to keep the thread alive on any throwable. For unexpected exceptions, it seems it's better to just propagate the exception, log it and then kill the thread. This is already done through Utils.newThread. If we want to clean things up (e.g. countdown) before propagating the exception, we can do that in a finally clause. Hi Jun, I think only letting the acceptor thread die might cause more issue. The broker in that case will not serve new connections. This essentially makes all the partitions on this broker become unavailable. If we let the accpetor thread exit, maybe we should shutdown the broker completely. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36652/#review92920 --- On July 24, 2015, 4:36 a.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36652/ --- (Updated July 24, 2015, 4:36 a.m.) Review request for kafka. Bugs: KAFKA-2351 https://issues.apache.org/jira/browse/KAFKA-2351 Repository: kafka Description --- Added a try-catch to catch any exceptions thrown by the nioSelector Addressed comments on the Jira ticket Diffs - core/src/main/scala/kafka/network/SocketServer.scala 91319fa010b140cca632e5fa8050509bd2295fc9 Diff: https://reviews.apache.org/r/36652/diff/ Testing --- Thanks, Mayuresh Gharat
[jira] [Commented] (KAFKA-2364) Improve documentation for contributing to docs
[ https://issues.apache.org/jira/browse/KAFKA-2364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643179#comment-14643179 ] Ismael Juma commented on KAFKA-2364: Coincidentally a CONTRIBUTING.md was added today and it links to 2 pages that have more information. Can you please take a look and see if it's better now? The website is sadly still in SVN, but we would be happy to move it to Git as per https://blogs.apache.org/infra/entry/git_based_websites_available Help would be welcome as we are currently all busy with other tasks. Improve documentation for contributing to docs -- Key: KAFKA-2364 URL: https://issues.apache.org/jira/browse/KAFKA-2364 Project: Kafka Issue Type: Task Reporter: Aseem Bansal Priority: Minor Labels: doc While reading the documentation for kafka 8 I saw some improvements that can be made. But the docs for contributing are not very good at https://github.com/apache/kafka. It just gives me a URL for svn. But I am not sure what to do. Can the README.MD file be improved for contributing to docs? I have submitted patches to groovy and grails by sending PRs via github but looking at the comments on PRs submitted to kafak it seems PRs via github are not working for kafka. It would be good to make that work also. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Partitioning in Kafka
For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on the partitions it consumes and write them to a separate topic. For example, if you are writing log messages to a logs topic with the hostname as the key, you could this partitioning strategy to compute message counts for each host in each partition and write them to a log-counts topic. Then a consumer of the log-counts topic would compute total aggregates based on the two intermediate aggregates. The benefit is that you are generally going to get better load balancing across partitions than if you used the default partitioner. (Please correct me if my understanding is incorrect, Gianmarco) So I think the question is whether this is a useful primitive for Kafka to provide out of the box? I was a little concerned that this use case is a little esoteric for a core feature, but it may make more sense in the context of KIP-28 which would provide some higher-level processing capabilities (though it doesn't seem like the KStream abstraction would provide a direct way to leverage this partitioner without custom logic). Thanks, Jason On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some stages of processing on top of it to make proper use of the data, i.e., it envisions Kafka as a substrate for stream processing, and not only as the I/O component. Is this a direction that Kafka wants to go towards? Or is this a role better left to the internal communication systems of other stream processing engines (e.g., Storm)? And if the answer is the latter, how would something such a Samza (which relies mostly on Kafka as its communication substrate) be able to implement advanced partitioning schemes? Cheers, -- Gianmarco
Re: Kafka Consumer thoughts
adding the open source alias. This email started off as a broader discussion around the new consumer. I was zooming into only the aspect of poll() being the only mechanism for driving the heartbeats. Yes the lag is the effect of the problem (not the problem). Monitoring the lag is important as it is the primary way to tell if the application is wedged. There might be other metrics which can possibly capture the same essence. Yes the lag is at the consumer group level, but you can tell that one of the consumers is messed up if one of the partitions in the application start generating lag and others are good for e.g. Monitoring aside, I think the main point of concern is that in the old consumer most customers don't have to worry about unnecessary rebalances and most of the things that they do in their app doesn't have an impact on the session timeout.. (i.e. the only thing that causes rebalances is when the GC is out of whack).For the handful of customers who are impacted by GC related rebalances, i would imagine that all of them would really want us to make the system more resilient.I agree that the GC problem can't be solved easily in the java client, however it appears that now we would be expecting the consuming applications to be even more careful with ongoing tuning of the timeouts. At LinkedIn, we have seen that most kafka applications don't have much of a clue about configuring the timeouts and just end up calling the Kafka team when their application sees rebalances. The other side effect of poll driving the heartbeats is that we have to make sure that people don't set a poll timeout that is larger than the session timeout. If we had a notion of implicit heartbeats then we could also automatically make this work for consumers by sending hearbeats at the appropriate interval even though the customers want to do a long poll. We could surely work around this in LinkedIn if either we have the Pause() api or an explicit HeartBeat() api on the consumer. Would love to hear how other people think about this subject ? Thanks Kartik On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede n...@confluent.io wrote: Agree with the dilemma you are pointing out, which is that there are many ways the application's message processing could fail and we wouldn't be able to model all of those in the consumer's failure detection mechanism. So we should try to model as much of it as we can so the consumer's failure detection is meaningful. Point being that the only absolute way to really detect that an app is healthy is to monitor lag. If the lag increases then for sure something is wrong. The lag is merely the effect of the problem, not the problem itself. Lag is also a consumer group level concept and the problem we have is being able to detect failures at the level of individual consumer instances. As you pointed out, a consumer that poll() is a stronger indicator of whether the consumer is alive or not. The dilemma then is who defines what a healthy poll() frequency is. No one else but the application owner can define what a normal processing latency is for their application. Now the question is what's the easiest way for the user to define this without having to tune and fine tune this too often. The heartbeat interval certainly does not have to be *exactly* 99tile of processing latency but could be in the ballpark + an error delta. The error delta is the application owner's acceptable risk threshold during which they would be ok if the application remains part of the group despite being dead. It is ultimately a tradeoff between operational ease and more accurate failure detection. With quotas the write latencies to kafka could range from a few milliseconds all the way to a tens of seconds. This is actually no different from the GC problem. Most most of the times, the normal GC falls in the few ms range and there are many applications even at LinkedIn for which the max GC falls in the multiple seconds range. Note that it also can't be predicted, so has to be an observed value. One way or the other, you have to observe what this acceptable max is for your application and then set the appropriate timeouts. Since this is not something that can be automated, this is a config that the application owner has to set based on the expected behavior of their application. Not wanting to do that leads to ending up with bad consumption semantics where the application process continues to be part of a group owning partitions but not consuming since it has halted due to a problem. The fact that the design requires them to express that in poll() frequency or not doesn't change the fact that the application owner has to go through the process of measuring and then defining this max. The reverse where they don't do this and the application remains in the group despite being dead is super confusing and frustrating too. So the due diligence up front is actually worth.
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643442#comment-14643442 ] Jason Gustafson commented on KAFKA-2350: [~becket_qin] I think that we're on the same page as far as supporting only automatic or manual assignment and not trying to mix them. I think my confusion is that subscribe(partition) in your proposal is used both a) to subscribe to a partition when manual assignment is used, and b) to unpause a partition when automatic assignment is used. This leads to the weird ordering problems that we have been talking about. By the way, I added the line about seek() and position() since it seems like something that intuitively should be supported by pause semantics. I think it's debatable whether it's really needed, but I think it would cause a bit a surprise to the user if we didn't allow it. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Kafka Consumer thoughts
I think if we recommend a longer session timeout, then we should expose the heartbeat frequency in configuration since this generally controls how long normal rebalances will take. I think it's currently hard-coded to 3 heartbeats per session timeout. It could also be nice to have an explicit LeaveGroup request to implement clean shutdown of a consumer. Then the coordinator doesn't have to wait for the timeout to reassign partitions. -Jason On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps j...@confluent.io wrote: Hey Kartik, Totally agree we don't want people tuning timeouts in the common case. However there are two ways to avoid this: 1. Default the timeout high 2. Put the heartbeat in a separate thread When we were doing the consumer design we discussed this tradeoff and I think the conclusion we came to was that defaulting to a high timeout was actually better. This means it takes a little longer to detect a failure, but usually that is not a big problem and people who want faster failure detection can tune it down. This seemed better than having the failure detection not really cover the consumption and just be a background ping. The two reasons where (a) you still have the GC problem even for the background thread, (b) consumption is in some sense a better definition of an active healthy consumer and a lot of problems crop up when you have an inactive consumer with an active background thread (as today). When we had the discussion I think what we realized was that most people who were worried about the timeout where imagining a very low default (500ms) say. But in fact just setting this to 60 seconds or higher as a default would be okay, this adds to the failure detection time but only apps that care about this need to tune. This should largely eliminate false positives since after all if you disappear for 60 seconds that actually starts to be more of a true positive, even if you come back... :-) -Jay On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam kparamasi...@linkedin.com wrote: adding the open source alias. This email started off as a broader discussion around the new consumer. I was zooming into only the aspect of poll() being the only mechanism for driving the heartbeats. Yes the lag is the effect of the problem (not the problem). Monitoring the lag is important as it is the primary way to tell if the application is wedged. There might be other metrics which can possibly capture the same essence. Yes the lag is at the consumer group level, but you can tell that one of the consumers is messed up if one of the partitions in the application start generating lag and others are good for e.g. Monitoring aside, I think the main point of concern is that in the old consumer most customers don't have to worry about unnecessary rebalances and most of the things that they do in their app doesn't have an impact on the session timeout.. (i.e. the only thing that causes rebalances is when the GC is out of whack).For the handful of customers who are impacted by GC related rebalances, i would imagine that all of them would really want us to make the system more resilient.I agree that the GC problem can't be solved easily in the java client, however it appears that now we would be expecting the consuming applications to be even more careful with ongoing tuning of the timeouts. At LinkedIn, we have seen that most kafka applications don't have much of a clue about configuring the timeouts and just end up calling the Kafka team when their application sees rebalances. The other side effect of poll driving the heartbeats is that we have to make sure that people don't set a poll timeout that is larger than the session timeout. If we had a notion of implicit heartbeats then we could also automatically make this work for consumers by sending hearbeats at the appropriate interval even though the customers want to do a long poll. We could surely work around this in LinkedIn if either we have the Pause() api or an explicit HeartBeat() api on the consumer. Would love to hear how other people think about this subject ? Thanks Kartik On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede n...@confluent.io wrote: Agree with the dilemma you are pointing out, which is that there are many ways the application's message processing could fail and we wouldn't be able to model all of those in the consumer's failure detection mechanism. So we should try to model as much of it as we can so the consumer's failure detection is meaningful. Point being that the only absolute way to really detect that an app is healthy is to monitor lag. If the lag increases then for sure something is wrong. The lag is merely the effect of the problem, not the problem itself. Lag is also a consumer group level concept and the problem we have is being able to detect failures at the level of individual consumer instances. As you pointed out, a consumer that
Review Request 36858: Patch for KAFKA-2120
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36858/ --- Review request for kafka. Bugs: KAFKA-2120 https://issues.apache.org/jira/browse/KAFKA-2120 Repository: kafka Description --- Addressed Joel', Adi's, Dong's, Becket's comments Diffs - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 7ab2503794ff3aab39df881bd9fbae6547827d3b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 4cb1e50d6c4ed55241aeaef1d3af09def5274103 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java a152bd7697dca55609a9ec4cfe0a82c10595fbc3 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 0baf16e55046a2f49f6431e01d52c323c95eddf0 clients/src/main/java/org/apache/kafka/common/network/Selector.java aaf60c98c2c0f4513a8d65ee0db67953a529d598 clients/src/test/java/org/apache/kafka/clients/MockClient.java d9c97e966c0e2fb605b67285f4275abb89f8813e clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 43238ceaad0322e39802b615bb805b895336a009 clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 5b2e4ffaeab7127648db608c179703b27b577414 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 Diff: https://reviews.apache.org/r/36858/diff/ Testing --- Thanks, Mayuresh Gharat
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643399#comment-14643399 ] Mayuresh Gharat commented on KAFKA-2120: Created reviewboard https://reviews.apache.org/r/36858/diff/ against branch origin/trunk Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Attachments: KAFKA-2120.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-2120: --- Status: Patch Available (was: Open) Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Attachments: KAFKA-2120.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-2120: --- Attachment: KAFKA-2120.patch Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Attachments: KAFKA-2120.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Kafka Consumer thoughts
Hey Kartik, Totally agree we don't want people tuning timeouts in the common case. However there are two ways to avoid this: 1. Default the timeout high 2. Put the heartbeat in a separate thread When we were doing the consumer design we discussed this tradeoff and I think the conclusion we came to was that defaulting to a high timeout was actually better. This means it takes a little longer to detect a failure, but usually that is not a big problem and people who want faster failure detection can tune it down. This seemed better than having the failure detection not really cover the consumption and just be a background ping. The two reasons where (a) you still have the GC problem even for the background thread, (b) consumption is in some sense a better definition of an active healthy consumer and a lot of problems crop up when you have an inactive consumer with an active background thread (as today). When we had the discussion I think what we realized was that most people who were worried about the timeout where imagining a very low default (500ms) say. But in fact just setting this to 60 seconds or higher as a default would be okay, this adds to the failure detection time but only apps that care about this need to tune. This should largely eliminate false positives since after all if you disappear for 60 seconds that actually starts to be more of a true positive, even if you come back... :-) -Jay On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam kparamasi...@linkedin.com wrote: adding the open source alias. This email started off as a broader discussion around the new consumer. I was zooming into only the aspect of poll() being the only mechanism for driving the heartbeats. Yes the lag is the effect of the problem (not the problem). Monitoring the lag is important as it is the primary way to tell if the application is wedged. There might be other metrics which can possibly capture the same essence. Yes the lag is at the consumer group level, but you can tell that one of the consumers is messed up if one of the partitions in the application start generating lag and others are good for e.g. Monitoring aside, I think the main point of concern is that in the old consumer most customers don't have to worry about unnecessary rebalances and most of the things that they do in their app doesn't have an impact on the session timeout.. (i.e. the only thing that causes rebalances is when the GC is out of whack).For the handful of customers who are impacted by GC related rebalances, i would imagine that all of them would really want us to make the system more resilient.I agree that the GC problem can't be solved easily in the java client, however it appears that now we would be expecting the consuming applications to be even more careful with ongoing tuning of the timeouts. At LinkedIn, we have seen that most kafka applications don't have much of a clue about configuring the timeouts and just end up calling the Kafka team when their application sees rebalances. The other side effect of poll driving the heartbeats is that we have to make sure that people don't set a poll timeout that is larger than the session timeout. If we had a notion of implicit heartbeats then we could also automatically make this work for consumers by sending hearbeats at the appropriate interval even though the customers want to do a long poll. We could surely work around this in LinkedIn if either we have the Pause() api or an explicit HeartBeat() api on the consumer. Would love to hear how other people think about this subject ? Thanks Kartik On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede n...@confluent.io wrote: Agree with the dilemma you are pointing out, which is that there are many ways the application's message processing could fail and we wouldn't be able to model all of those in the consumer's failure detection mechanism. So we should try to model as much of it as we can so the consumer's failure detection is meaningful. Point being that the only absolute way to really detect that an app is healthy is to monitor lag. If the lag increases then for sure something is wrong. The lag is merely the effect of the problem, not the problem itself. Lag is also a consumer group level concept and the problem we have is being able to detect failures at the level of individual consumer instances. As you pointed out, a consumer that poll() is a stronger indicator of whether the consumer is alive or not. The dilemma then is who defines what a healthy poll() frequency is. No one else but the application owner can define what a normal processing latency is for their application. Now the question is what's the easiest way for the user to define this without having to tune and fine tune this too often. The heartbeat interval certainly does not have to be *exactly* 99tile of processing latency but could be in the ballpark + an error delta. The error delta is the
Re: [DISCUSS] Partitioning in Kafka
Gwen - this is really like two steps of map reduce though, right? The first step does the partial shuffle to two partitions per key, second step does partial reduce + final full shuffle, final step does the final reduce. This strikes me as similar to partition assignment strategies in the consumer in that there will probably be a small handful of commonly used strategies that we can just maintain as part of Kafka. A few people will need more obscure strategies and they can maintain those implementations themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner for partitioning matrices, and ShuffleRowRDD for their SQL implementation. So I don't think it would be a big deal to include it here, although I'm not really sure how often it's useful -- compared to normal partitioning or just doing two steps by starting with unpartitioned data, you need to be performing an aggregation, the key set needs to be large enough for memory usage to be a problem (i.e. you don't want each consumer to have to maintain a map with every key in it), and a sufficiently skewed distribution (i.e. not just 1 or 2 very hot keys). The key set constraint, in particular, is the one I'm not convinced by since in practice if you have a skewed distribution, you probably also won't actually see every key in every partition; each worker actually only needs to maintain a subset of the key set (and associated aggregate data) in memory. On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com wrote: If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote: For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on the partitions it consumes and write them to a separate topic. For example, if you are writing log messages to a logs topic with the hostname as the key, you could this partitioning strategy to compute message counts for each host in each partition and write them to a log-counts topic. Then a consumer of the log-counts topic would compute total aggregates based on the two intermediate aggregates. The benefit is that you are generally going to get better load balancing across partitions than if you used the default partitioner. (Please correct me if my understanding is incorrect, Gianmarco) So I think the question is whether this is a useful primitive for Kafka to provide out of the box? I was a little concerned that this use case is a little esoteric for a core feature, but it may make more sense in the context of KIP-28 which would provide some higher-level processing capabilities (though it doesn't seem like the KStream abstraction would provide a direct way to leverage this partitioner without custom logic). Thanks, Jason On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some stages of processing on top of it to make proper use of the data, i.e., it envisions Kafka as a substrate for stream processing, and not only as the I/O component. Is this a direction that Kafka wants to go towards? Or is this a role better left to the internal communication systems of other stream processing engines (e.g., Storm)? And if the answer is the latter, how would something such a Samza (which relies mostly on Kafka as its communication substrate) be able to implement advanced partitioning schemes? Cheers, -- Gianmarco -- Thanks, Ewen
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643349#comment-14643349 ] Jason Gustafson commented on KAFKA-2350: There's one interesting implementation note that [~yasuhiro.matsuda] and [~guozhang] brought up. When a partition is unpaused, there may be an active fetch which is parked on the broker. In the current implementation, the consumer will not initiate any new fetches until that fetch has completed. This means that the consumer will not be able to immediately process messages from the unpaused partition even if has some available. There are a couple ways this could be handled. We could issue the new fetch from a different socket on the client. We could also implement a way to cancel or override the active fetch on the broker. Since both of these make this patch significantly more complex, I think we should just note this limitation in the documentation and address it later if it becomes a larger problem. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36858: Patch for KAFKA-2120
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36858/ --- (Updated July 27, 2015, 9:09 p.m.) Review request for kafka. Bugs: KAFKA-2120 https://issues.apache.org/jira/browse/KAFKA-2120 Repository: kafka Description (updated) --- Kip-19 : Added RequestTimeout and MaxBlockTimeout as per the KIP. Diffs - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 7ab2503794ff3aab39df881bd9fbae6547827d3b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 4cb1e50d6c4ed55241aeaef1d3af09def5274103 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java a152bd7697dca55609a9ec4cfe0a82c10595fbc3 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 0baf16e55046a2f49f6431e01d52c323c95eddf0 clients/src/main/java/org/apache/kafka/common/network/Selector.java aaf60c98c2c0f4513a8d65ee0db67953a529d598 clients/src/test/java/org/apache/kafka/clients/MockClient.java d9c97e966c0e2fb605b67285f4275abb89f8813e clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 43238ceaad0322e39802b615bb805b895336a009 clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 5b2e4ffaeab7127648db608c179703b27b577414 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 Diff: https://reviews.apache.org/r/36858/diff/ Testing --- Thanks, Mayuresh Gharat
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643418#comment-14643418 ] Jiangjie Qin commented on KAFKA-2350: - [~hachikuji], I am with [~guozhang] that it is much clear if we only support either auto partition assignment or manual partition assignment, but not mixed mode. I assumed pause/unpause will only be used when auto partition assignment is used, so we can check the assigned partition set. If it is under manual partition assignment, for the seek(), I assume that you will start consume after seek? If so, it might be the same as: {code} subscribe(tp) seek(tp, offset) poll() {code} [~guozhang], I see your point. I am convinced that for people who are using auto partition assignment, pause/unpause is more intuitive than using partition level sub/unsub. I am not really oppose to having them. What I was worrying is that we are adding methods that are intuitive to some particular use case, but potentially open the door for adding APIs that only have subtle differences. If we take a closer look at our API, there are potentially some other cases we can argue for new API. e.g. user might want to have auto commit turn on only for some but not all of the partitions they subscribed to. User might want to find a list of offsets of a partition within a time range. These are all different use cases, but likely can be solved with some lower level API calls instead of having a dedicate intuitive API for each of them. I kind of feel the dilemma we are facing now is that in new consumer, we try to address both the high level consumer and low level consumer use cases. pause/unpause looks to me a medium-to-low level use case. As the higher level requirements can vary a lot and have subtle difference from one to another, the question to be answered is that should we expose a high level interface for each of the high level use case? Or should we just ask user to use a lower level API as long as we support the functionality. My understanding is that for high level consumer use cases, hopefully we don't need user to care too much about the underlying mechanism. For people who wants to deal with lower level concept such as offsets, partition assignment, temporary consumption suspension, instead of having on high level API written for each of the use cases, letting user use a lower level API makes sense to me. In terms of the example you mentioned, can we solve them by throwing appropriate exceptions? {code} // Auto partition assignment subscribe(topic1) // assuming only topic1-partition0 is assigned to this consumer. subscribe(topic1-partition1) // throw IllegalStateException(Cannot subscribe to topic1-partition1 because topic1 is managed by consumer coordinator) unsubscribe(topic1-partition1) // throw IllegalStateException(Topic1-partiion1 is managed by consumer coordinator, and topic1-partition1 is not assigned to this consumer.) {code} {code} subscribe(topic1-partition0) subscribe(topic1) // throw IllegalStateException(Cannot subscribe to topic1 because the assignment of topic1 is manually managed) unsubscribe(topic1-partition1) // throw IllegalStateException(Cannot unsubscribe topic1-partition1 because it is not subscribed) {code} [~nehanarkhede], what you said makes a lot of sense. I guess I'm just looking at the problem from a different angle - user either wants to consume from a topic or not at a certain point, whether temporarily or permanently. So the state I see is only CONSUME/NOT_CONSUME. The PAUSE state for consumer would be pretty much the same as NOT_CONSUME. I might have missed some use case like [~hachikuji] mentioned - seek() on a PAUSE partition, but that can be solved by calling subscribe() first. [~gwenshap], good point about heartbeat. We actually got some feedback from users in LinkedIn and found that putting the responsibility of sending heartbeat on user might be a problem in the first place... We may have pause/unpause as a workaround, but the ultimate issue is that maybe we are asking too much from user to maintain the heartbeat... Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another
Re: Kafka Consumer thoughts
Kartik, on your second point about timeouts with poll() and heartbeats, the consumer now handles this properly. KAFKA-2123 introduced a DelayedTaskQueue and that is used internally to handle processing events at the right time even if poll() is called with a large timeout. The same mechanism is used to handle auto commit, which should also occur in a timely fashion even if poll() is called with a large timeout. On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps j...@confluent.io wrote: Hey Kartik, Totally agree we don't want people tuning timeouts in the common case. However there are two ways to avoid this: 1. Default the timeout high 2. Put the heartbeat in a separate thread When we were doing the consumer design we discussed this tradeoff and I think the conclusion we came to was that defaulting to a high timeout was actually better. This means it takes a little longer to detect a failure, but usually that is not a big problem and people who want faster failure detection can tune it down. This seemed better than having the failure detection not really cover the consumption and just be a background ping. The two reasons where (a) you still have the GC problem even for the background thread, (b) consumption is in some sense a better definition of an active healthy consumer and a lot of problems crop up when you have an inactive consumer with an active background thread (as today). When we had the discussion I think what we realized was that most people who were worried about the timeout where imagining a very low default (500ms) say. But in fact just setting this to 60 seconds or higher as a default would be okay, this adds to the failure detection time but only apps that care about this need to tune. This should largely eliminate false positives since after all if you disappear for 60 seconds that actually starts to be more of a true positive, even if you come back... :-) -Jay On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam kparamasi...@linkedin.com wrote: adding the open source alias. This email started off as a broader discussion around the new consumer. I was zooming into only the aspect of poll() being the only mechanism for driving the heartbeats. Yes the lag is the effect of the problem (not the problem). Monitoring the lag is important as it is the primary way to tell if the application is wedged. There might be other metrics which can possibly capture the same essence. Yes the lag is at the consumer group level, but you can tell that one of the consumers is messed up if one of the partitions in the application start generating lag and others are good for e.g. Monitoring aside, I think the main point of concern is that in the old consumer most customers don't have to worry about unnecessary rebalances and most of the things that they do in their app doesn't have an impact on the session timeout.. (i.e. the only thing that causes rebalances is when the GC is out of whack).For the handful of customers who are impacted by GC related rebalances, i would imagine that all of them would really want us to make the system more resilient.I agree that the GC problem can't be solved easily in the java client, however it appears that now we would be expecting the consuming applications to be even more careful with ongoing tuning of the timeouts. At LinkedIn, we have seen that most kafka applications don't have much of a clue about configuring the timeouts and just end up calling the Kafka team when their application sees rebalances. The other side effect of poll driving the heartbeats is that we have to make sure that people don't set a poll timeout that is larger than the session timeout. If we had a notion of implicit heartbeats then we could also automatically make this work for consumers by sending hearbeats at the appropriate interval even though the customers want to do a long poll. We could surely work around this in LinkedIn if either we have the Pause() api or an explicit HeartBeat() api on the consumer. Would love to hear how other people think about this subject ? Thanks Kartik On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede n...@confluent.io wrote: Agree with the dilemma you are pointing out, which is that there are many ways the application's message processing could fail and we wouldn't be able to model all of those in the consumer's failure detection mechanism. So we should try to model as much of it as we can so the consumer's failure detection is meaningful. Point being that the only absolute way to really detect that an app is healthy is to monitor lag. If the lag increases then for sure something is wrong. The lag is merely the effect of the problem, not the problem itself. Lag is also a consumer group level concept and the problem we have is being able to detect failures at the level of individual consumer instances. As you pointed out, a consumer that poll() is a stronger indicator
[jira] [Comment Edited] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643418#comment-14643418 ] Jiangjie Qin edited comment on KAFKA-2350 at 7/27/15 9:45 PM: -- [~hachikuji], I am with [~guozhang] that it is much clear if we only support either auto partition assignment or manual partition assignment, but not mixed mode. I assumed pause/unpause will only be used when auto partition assignment is used, so we can check the assigned partition set. If it is under manual partition assignment, for the seek(), I assume that you will start consume after seek? If so, it might be the same as: {code} subscribe(tp) seek(tp, offset) poll() {code} [~guozhang], I see your point. I am convinced that for people who are using auto partition assignment, pause/unpause is more intuitive than using partition level sub/unsub. I am not really oppose to having them. What I was worrying is that we are adding methods that are intuitive to some particular use case, but potentially open the door for adding APIs that only have subtle differences. If we take a closer look at our API, there are potentially some other cases we can argue for new API. e.g. user might want to have auto commit turn on only for some but not all of the partitions they subscribed to. User might want to find a list of offsets of a partition within a time range. These are all different use cases, but likely can be solved with some lower level API calls instead of having a dedicate intuitive API for each of them. I kind of feel the dilemma we are facing now is that in new consumer, we try to address both the high level consumer and low level consumer use cases. pause/unpause looks to me a medium-to-low level use case. As the higher level requirements can vary a lot and have subtle difference from one to another, the question to be answered is that should we expose a high level interface for each of the high level use case? Or should we just ask user to use a lower level API as long as we support the functionality. My understanding is that for high level consumer use cases, hopefully we don't need user to care too much about the underlying mechanism. For people who wants to deal with lower level concept such as offsets, partition assignment, temporary consumption suspension, instead of having on high level API written for each of the use cases, letting user use a lower level API makes sense to me. In terms of the example you mentioned, can we solve them by throwing appropriate exceptions? {code} // Auto partition assignment subscribe(topic1) // assuming only topic1-partition0 is assigned to this consumer. subscribe(topic1-partition1) // throw IllegalStateException(Cannot subscribe to topic1-partition1 because topic1 is managed by consumer coordinator) unsubscribe(topic1-partition1) // throw IllegalStateException(Topic1-partiion1 is managed by consumer coordinator, and topic1-partition1 is not assigned to this consumer.) {code} {code} // Manual partition assignment subscribe(topic1-partition0) subscribe(topic1) // throw IllegalStateException(Cannot subscribe to topic1 because the assignment of topic1 is manually managed) unsubscribe(topic1-partition1) // throw IllegalStateException(Cannot unsubscribe topic1-partition1 because it is not subscribed) {code} [~nehanarkhede], what you said makes a lot of sense. I guess I'm just looking at the problem from a different angle - user either wants to consume from a topic or not at a certain point, whether temporarily or permanently. So the state I see is only CONSUME/NOT_CONSUME. The PAUSE state for consumer would be pretty much the same as NOT_CONSUME. I might have missed some use case like [~hachikuji] mentioned - seek() on a PAUSE partition, but that can be solved by calling subscribe() first. [~gwenshap], good point about heartbeat. We actually got some feedback from users in LinkedIn and found that putting the responsibility of sending heartbeat on user might be a problem in the first place... We may have pause/unpause as a workaround, but the ultimate issue is that maybe we are asking too much from user to maintain the heartbeat... was (Author: becket_qin): [~hachikuji], I am with [~guozhang] that it is much clear if we only support either auto partition assignment or manual partition assignment, but not mixed mode. I assumed pause/unpause will only be used when auto partition assignment is used, so we can check the assigned partition set. If it is under manual partition assignment, for the seek(), I assume that you will start consume after seek? If so, it might be the same as: {code} subscribe(tp) seek(tp, offset) poll() {code} [~guozhang], I see your point. I am convinced that for people who are using auto partition assignment, pause/unpause is more intuitive than using partition level sub/unsub. I am not really oppose to
Re: error while high level consumer
This is due to the zookeeper path storing the previous owner info hasn't been deleted at the moment. If the rebalance completes after retry, it should be fine. Jiangjie (Becket) Qin On Fri, Jul 24, 2015 at 6:54 PM, Kris K squareksc...@gmail.com wrote: Hi, I started seeing these errors in the logs continuously when I try to bring the High Level Consumer up. Please help. ZookeeperConsumerConnector [INFO] [XXX], waiting for the partition ownership to be deleted: 1 ZookeeperConsumerConnector [INFO] [XXX], end rebalancing consumer XXX try #0 ZookeeperConsumerConnector [INFO] [XXX], Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered Thanks, Kris
[jira] [Commented] (KAFKA-2268) New producer logs WARN if serializer supplied directly to constructor
[ https://issues.apache.org/jira/browse/KAFKA-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643573#comment-14643573 ] Xuan Gong commented on KAFKA-2268: -- Looks like this is duplicate with https://issues.apache.org/jira/browse/KAFKA-2289 New producer logs WARN if serializer supplied directly to constructor - Key: KAFKA-2268 URL: https://issues.apache.org/jira/browse/KAFKA-2268 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.1 Reporter: Morten Lied Johansen Assignee: Jun Rao Priority: Trivial When creating a new KafkaProducer by passing in a configuration-map, a key serializer and a value serializer, two warnings are logged: {noformat} [WARN ] [2015-06-15T14:26:36,271] The configuration value.serializer = class no.finntech.commons.kafka.ThriftSerializer was supplied but isn't a known config. [org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:121)] [RMI TCP Connection(2)-127.0.0.1] [WARN ] [2015-06-15T14:26:36,271] The configuration key.serializer = class org.apache.kafka.common.serialization.StringSerializer was supplied but isn't a known config. [org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:121)] [RMI TCP Connection(2)-127.0.0.1] {noformat} This happens because this constructor adds the serializers to the config (KafkaProducer.java:108), and then ignores that configuration since the instances are already present (KafkaProducer.java:215). Finally, it checks which parts of the configuration were used, and logs warnings for the remaining keys (KafkaProducer.java:230). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2130) Resource leakage in AppInfo.scala during initialization
[ https://issues.apache.org/jira/browse/KAFKA-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643625#comment-14643625 ] Xuan Gong commented on KAFKA-2130: -- move {code} stream.close(); {code} to the finally block ? Resource leakage in AppInfo.scala during initialization --- Key: KAFKA-2130 URL: https://issues.apache.org/jira/browse/KAFKA-2130 Project: Kafka Issue Type: Bug Reporter: Sebastien Zimmer Priority: Trivial Labels: patch Attachments: patch.diff Minor InputStream leakage during the server initialization in AppInfo.scala. Patch attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643532#comment-14643532 ] Sourabh Chandak commented on KAFKA-1690: [~sriharsha] Will this patch unblock the entire SSL authentication issue? new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36858: Patch for KAFKA-2120
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36858/ --- (Updated July 27, 2015, 10:31 p.m.) Review request for kafka. Bugs: KAFKA-2120 https://issues.apache.org/jira/browse/KAFKA-2120 Repository: kafka Description (updated) --- Solved compile error Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 7ab2503794ff3aab39df881bd9fbae6547827d3b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 4cb1e50d6c4ed55241aeaef1d3af09def5274103 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java a152bd7697dca55609a9ec4cfe0a82c10595fbc3 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 0baf16e55046a2f49f6431e01d52c323c95eddf0 clients/src/main/java/org/apache/kafka/common/network/Selector.java aaf60c98c2c0f4513a8d65ee0db67953a529d598 clients/src/test/java/org/apache/kafka/clients/MockClient.java d9c97e966c0e2fb605b67285f4275abb89f8813e clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 43238ceaad0322e39802b615bb805b895336a009 clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 5b2e4ffaeab7127648db608c179703b27b577414 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 Diff: https://reviews.apache.org/r/36858/diff/ Testing --- Thanks, Mayuresh Gharat
Re: Review Request 36858: Patch for KAFKA-2120
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36858/#review93189 --- Looks pretty good overall. Found mostly trivial stuff. clients/src/main/java/org/apache/kafka/clients/ClientRequest.java (line 26) https://reviews.apache.org/r/36858/#comment147486 Should ClientResponse.requestLatencyMs be updated to use sendMs instead of createdMs? Or perhaps a new method like wireLatencyMs() can record responseMs - sendMs? Both seem like useful metrics. clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java (line 48) https://reviews.apache.org/r/36858/#comment147491 It seems like it might be more natural to set this in NetworkClient.send. clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java (line 142) https://reviews.apache.org/r/36858/#comment147492 Probably better practice to use !requests.isEmpty(). clients/src/main/java/org/apache/kafka/clients/KafkaClient.java (line 72) https://reviews.apache.org/r/36858/#comment147493 Missing @param for now. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 538) https://reviews.apache.org/r/36858/#comment147497 You don't necessarily need to fix this in this patch, but this setting could conflict with FETCH_MAX_WAIT_MS_CONFIG. clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 86) https://reviews.apache.org/r/36858/#comment147510 Would it make sense to try to consolidate clientDisconnects and failedSends into the same collection? Unfortunately I can't think of a good name. Maybe nextDisconnects? clients/src/test/java/org/apache/kafka/clients/MockClient.java (line 94) https://reviews.apache.org/r/36858/#comment147498 Do we need to set sendMs in this method for consistency? - Jason Gustafson On July 27, 2015, 10:32 p.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36858/ --- (Updated July 27, 2015, 10:32 p.m.) Review request for kafka. Bugs: KAFKA-2120 https://issues.apache.org/jira/browse/KAFKA-2120 Repository: kafka Description --- Kip-19 : Added RequestTimeout and MaxBlockTimeout as per the Kip Diffs - clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ed4c0d98596cc294757f35df8c8cbc8e36ff42de clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 7ab2503794ff3aab39df881bd9fbae6547827d3b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 4cb1e50d6c4ed55241aeaef1d3af09def5274103 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java a152bd7697dca55609a9ec4cfe0a82c10595fbc3 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 0baf16e55046a2f49f6431e01d52c323c95eddf0 clients/src/main/java/org/apache/kafka/common/network/Selector.java aaf60c98c2c0f4513a8d65ee0db67953a529d598 clients/src/test/java/org/apache/kafka/clients/MockClient.java d9c97e966c0e2fb605b67285f4275abb89f8813e clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 43238ceaad0322e39802b615bb805b895336a009 clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 5b2e4ffaeab7127648db608c179703b27b577414 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643597#comment-14643597 ] Sriharsha Chintalapani commented on KAFKA-1690: --- [~sourabh0612] Yes. It includes broker , producer (new api) , consumer (new api) changes. new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-27 - Conditional Publish
@Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each partition. 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER The major problem it tries to solve is exact once produce, i.e. solve the duplicates from producer side. In that case, a batch will be considered as atomic. The only possibility of a batch got rejected should be it is already appended. So the producer should just move on. It looks to me even a transient multiple producer scenario will cause issue because user need to think about what should do if a request got rejected and the answer varies for different use cases. Thanks, Jiangjie (Becket) Qin On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote: So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they compare; if I've made any mistakes, I'd be delighted if someone with more context on the idempotent producer design would correct me. As a first intuition, you can think of the 'conditional publish' proposal as the special case of the 'idempotent producer' idea, where there's just a single producer per-partition. The key observation here is: if there's only one producer, you can conflate the 'sequence number' and the expected offset. The conditional publish proposal uses existing Kafka offset APIs for roughly the same things as the idempotent producer proposal uses sequence numbers for -- eg. instead of having a lease PID API that returns the current sequence number, we can use the existing 'offset API' to retrieve the upcoming offset. Both proposals attempt to deal with the situation where there are transiently multiple publishers for the same partition (and PID). The idempotent producer setup tracks a generation id for each pid, and discards any writes with a generation id smaller than the latest value. Conditional publish is 'first write wins' -- and instead of dropping duplicates on the server, it returns an error to the client. The duplicate-handling behaviour (dropping vs. erroring) has some interesting consequences: - If all producers are producing the same stream of messages, silently dropping duplicates on the server is more convenient. (Suppose we have a batch of messages 0-9, and the high-water mark on the server is 7. Idempotent producer, as I read it, would append 7-9 to the partition and return success; meanwhile, conditional publish would fail the entire batch.) - If producers might be writing different streams of messages, the proposed behaviour of the idempotent producer is probably worse -- since it can silently interleave messages from two different producers. This can be a problem for some commit-log style use-cases, since it can transform a valid series of operations into an invalid one. - Given the error-on-duplicate behaviour, it's possible to implement deduplication on the client. (Sketch: if a publish returns an error for some partition, fetch the upcoming offset / sequence number for that partition, and discard all messages with a smaller offset on the client before republishing.) I think this makes the erroring behaviour more general, though deduplicating saves a roundtrip or two at conflict time. I'm less clear about the behaviour of the generation id, or what happens when (say) two producers with the same generation id are spun up at the same time. I'd be interested in hearing other folks' comments on this. Ewen: I'm not sure I understand the questions well enough to answer properly, but some quick notes: - I don't think it makes sense to assign an expected offset without already having assigned a partition. If the producer code is doing the partition assignment, it should probably do the offset assignment too... or we could just let application code handle both. - I'm not aware of any case where reassigning offsets to messages automatically after an offset mismatch makes sense: in the cases we've discussed, it seems like either it's safe to drop duplicates, or we want to handle the error at the application level. I'm going to try and come with an idempotent-producer-type example that works with the draft patch in the next few days, so hopefully we'll have something more concrete to discuss. Otherwise -- if you have a clear idea of how eg. sequence number assignment would work in the idempotent-producer proposal, we could probably translate that over to get the equivalent for the conditional publish API. On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava e...@confluent.io wrote: @Becket - for compressed batches, I think this just works out given the KIP as described. Without the change you're referring to, it still only makes sense to batch messages with this KIP if all the expected offsets are sequential (else some messages are guaranteed to
Re: [DISCUSS] Partitioning in Kafka
I guess it depends on whether the original producer did any map tasks or simply wrote raw data. We usually advocate writing raw data, and since we need to write it anyway, the partitioner doesn't introduce any extra hops. Its definitely useful to look at use-cases and I need to think a bit more on whether huge-key-space-with-large-skew is the only one. I think that there are use-cases that are not pure-aggregate and therefore keeping key-list in memory won't help and scaling to large number of partitions is still required (and therefore skew is a critical problem). However, I may be making stuff up, so need to double check. Gwen On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Gwen - this is really like two steps of map reduce though, right? The first step does the partial shuffle to two partitions per key, second step does partial reduce + final full shuffle, final step does the final reduce. This strikes me as similar to partition assignment strategies in the consumer in that there will probably be a small handful of commonly used strategies that we can just maintain as part of Kafka. A few people will need more obscure strategies and they can maintain those implementations themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner for partitioning matrices, and ShuffleRowRDD for their SQL implementation. So I don't think it would be a big deal to include it here, although I'm not really sure how often it's useful -- compared to normal partitioning or just doing two steps by starting with unpartitioned data, you need to be performing an aggregation, the key set needs to be large enough for memory usage to be a problem (i.e. you don't want each consumer to have to maintain a map with every key in it), and a sufficiently skewed distribution (i.e. not just 1 or 2 very hot keys). The key set constraint, in particular, is the one I'm not convinced by since in practice if you have a skewed distribution, you probably also won't actually see every key in every partition; each worker actually only needs to maintain a subset of the key set (and associated aggregate data) in memory. On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com wrote: If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote: For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on the partitions it consumes and write them to a separate topic. For example, if you are writing log messages to a logs topic with the hostname as the key, you could this partitioning strategy to compute message counts for each host in each partition and write them to a log-counts topic. Then a consumer of the log-counts topic would compute total aggregates based on the two intermediate aggregates. The benefit is that you are generally going to get better load balancing across partitions than if you used the default partitioner. (Please correct me if my understanding is incorrect, Gianmarco) So I think the question is whether this is a useful primitive for Kafka to provide out of the box? I was a little concerned that this use case is a little esoteric for a core feature, but it may make more sense in the context of KIP-28 which would provide some higher-level processing capabilities (though it doesn't seem like the KStream abstraction would provide a direct way to leverage this partitioner without custom logic). Thanks, Jason On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643533#comment-14643533 ] Mayuresh Gharat commented on KAFKA-2120: Updated reviewboard https://reviews.apache.org/r/36858/diff/ against branch origin/trunk Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-2120: --- Attachment: KAFKA-2120_2015-07-27_15:31:19.patch Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: error while high level consumer
Try bouncing the host that appears in the stored data section. Thanks, Mayuresh On Mon, Jul 27, 2015 at 3:41 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: This is due to the zookeeper path storing the previous owner info hasn't been deleted at the moment. If the rebalance completes after retry, it should be fine. Jiangjie (Becket) Qin On Fri, Jul 24, 2015 at 6:54 PM, Kris K squareksc...@gmail.com wrote: Hi, I started seeing these errors in the logs continuously when I try to bring the High Level Consumer up. Please help. ZookeeperConsumerConnector [INFO] [XXX], waiting for the partition ownership to be deleted: 1 ZookeeperConsumerConnector [INFO] [XXX], end rebalancing consumer XXX try #0 ZookeeperConsumerConnector [INFO] [XXX], Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered Thanks, Kris -- -Regards, Mayuresh R. Gharat (862) 250-7125
[jira] [Created] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer
Ashish K Singh created KAFKA-2381: - Summary: Possible ConcurrentModificationException while unsubscribing from a topic in new consumer Key: KAFKA-2381 URL: https://issues.apache.org/jira/browse/KAFKA-2381 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh Possible ConcurrentModificationException while unsubscribing from a topic in new consumer. Attempt is made to modify AssignedPartitions while looping over it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643703#comment-14643703 ] Ashish K Singh commented on KAFKA-2381: --- [~gwenshap] could you take a look when you get a chance. Possible ConcurrentModificationException while unsubscribing from a topic in new consumer - Key: KAFKA-2381 URL: https://issues.apache.org/jira/browse/KAFKA-2381 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-2381.patch, KAFKA-2381_2015-07-27_17:56:00.patch Possible ConcurrentModificationException while unsubscribing from a topic in new consumer. Attempt is made to modify AssignedPartitions while looping over it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish K Singh updated KAFKA-2381: -- Attachment: KAFKA-2381_2015-07-27_17:56:00.patch Possible ConcurrentModificationException while unsubscribing from a topic in new consumer - Key: KAFKA-2381 URL: https://issues.apache.org/jira/browse/KAFKA-2381 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-2381.patch, KAFKA-2381_2015-07-27_17:56:00.patch Possible ConcurrentModificationException while unsubscribing from a topic in new consumer. Attempt is made to modify AssignedPartitions while looping over it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643699#comment-14643699 ] Ashish K Singh commented on KAFKA-2381: --- Updated reviewboard https://reviews.apache.org/r/36871/ against branch trunk Possible ConcurrentModificationException while unsubscribing from a topic in new consumer - Key: KAFKA-2381 URL: https://issues.apache.org/jira/browse/KAFKA-2381 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-2381.patch, KAFKA-2381_2015-07-27_17:56:00.patch Possible ConcurrentModificationException while unsubscribing from a topic in new consumer. Attempt is made to modify AssignedPartitions while looping over it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36871: Patch for KAFKA-2381
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36871/#review93215 --- core/src/test/scala/integration/kafka/api/ConsumerTest.scala (line 233) https://reviews.apache.org/r/36871/#comment147535 consider closing this in a finally. A failing test can cause incorrect tear down of the test - Aditya Auradkar On July 28, 2015, 12:56 a.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36871/ --- (Updated July 28, 2015, 12:56 a.m.) Review request for kafka. Bugs: KAFKA-2381 https://issues.apache.org/jira/browse/KAFKA-2381 Repository: kafka Description --- KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a topic in new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 4d9a425201115a66b457b58d670992b279091f5a core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36871/diff/ Testing --- Thanks, Ashish Singh
[jira] [Updated] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish K Singh updated KAFKA-2381: -- Status: Patch Available (was: Open) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer - Key: KAFKA-2381 URL: https://issues.apache.org/jira/browse/KAFKA-2381 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-2381.patch Possible ConcurrentModificationException while unsubscribing from a topic in new consumer. Attempt is made to modify AssignedPartitions while looping over it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish K Singh updated KAFKA-2381: -- Attachment: KAFKA-2381.patch Possible ConcurrentModificationException while unsubscribing from a topic in new consumer - Key: KAFKA-2381 URL: https://issues.apache.org/jira/browse/KAFKA-2381 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-2381.patch Possible ConcurrentModificationException while unsubscribing from a topic in new consumer. Attempt is made to modify AssignedPartitions while looping over it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 36871: Patch for KAFKA-2381
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36871/ --- Review request for kafka. Bugs: KAFKA-2381 https://issues.apache.org/jira/browse/KAFKA-2381 Repository: kafka Description --- KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a topic in new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 4d9a425201115a66b457b58d670992b279091f5a core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36871/diff/ Testing --- Thanks, Ashish Singh
[jira] [Commented] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerGroupCommand
[ https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643716#comment-14643716 ] Ashish K Singh commented on KAFKA-313: -- [~gwenshap] need help with getting this KIP to done state :). It has been quite some time. Add JSON/CSV output and looping options to ConsumerGroupCommand --- Key: KAFKA-313 URL: https://issues.apache.org/jira/browse/KAFKA-313 Project: Kafka Issue Type: Improvement Reporter: Dave DeMaagd Assignee: Ashish K Singh Priority: Minor Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch, KAFKA-313_2015-06-24_11:14:24.patch Adds: * '--loop N' - causes the program to loop forever, sleeping for up to N seconds between loops (loop time minus collection time, unless that's less than 0, at which point it will just run again immediately) * '--asjson' - display as a JSON string instead of the more human readable output format. Neither of the above depend on each other (you can loop in the human readable output, or do a single shot execution with JSON output). Existing behavior/output maintained if neither of the above are used. Diff Attached. Impacted files: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2360) The kafka-consumer-perf-test.sh script help information print useless parameters.
[ https://issues.apache.org/jira/browse/KAFKA-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bo Wang updated KAFKA-2360: --- Description: Run kafka-consumer-perf-test.sh --help to show help information, but found 3 parameters useless : --batch-size and --batch-size That is producer of parameters. bin]# ./kafka-consumer-perf-test.sh --help Missing required argument [topic] Option Description -- --- --batch-size Integer: sizeNumber of messages to write in a single batch. (default: 200) --compression-codec Integer: If set, messages are sent compressed supported codec: NoCompressionCodec (default: 0) as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3 --date-format date format The date format to use for formatting the time field. See java.text. SimpleDateFormat for options. (default: -MM-dd HH:mm:ss:SSS) --fetch-size Integer: sizeThe amount of data to fetch in a single request. (default: 1048576) was: Run kafka-consumer-perf-test.sh --help to show help information, but found two parameters useless : --batch-size and --batch-size That is producer of parameters. bin]# ./kafka-consumer-perf-test.sh --help Missing required argument [topic] Option Description -- --- --batch-size Integer: sizeNumber of messages to write in a single batch. (default: 200) --compression-codec Integer: If set, messages are sent compressed supported codec: NoCompressionCodec (default: 0) as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3 --date-format date format The date format to use for formatting the time field. See java.text. SimpleDateFormat for options. (default: -MM-dd HH:mm:ss:SSS) --fetch-size Integer: sizeThe amount of data to fetch in a single request. (default: 1048576) The kafka-consumer-perf-test.sh script help information print useless parameters. - Key: KAFKA-2360 URL: https://issues.apache.org/jira/browse/KAFKA-2360 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.2.1 Environment: Linux Reporter: Bo Wang Priority: Minor Original Estimate: 24h Remaining Estimate: 24h Run kafka-consumer-perf-test.sh --help to show help information, but found 3 parameters useless : --batch-size and --batch-size That is producer of parameters. bin]# ./kafka-consumer-perf-test.sh --help Missing required argument [topic] Option Description -- --- --batch-size Integer: sizeNumber of messages to write in a single batch. (default: 200) --compression-codec Integer: If set, messages are sent compressed supported codec: NoCompressionCodec (default: 0) as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3 --date-format date format The date format to use for formatting the time field. See java.text. SimpleDateFormat for options. (default: -MM-dd HH:mm:ss:SSS) --fetch-size Integer: sizeThe amount of data to fetch in a
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643658#comment-14643658 ] Sourabh Chandak commented on KAFKA-1690: Awesome! When will this be integrated to the main branch? new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36871: Patch for KAFKA-2381
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36871/ --- (Updated July 28, 2015, 12:56 a.m.) Review request for kafka. Bugs: KAFKA-2381 https://issues.apache.org/jira/browse/KAFKA-2381 Repository: kafka Description --- KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a topic in new consumer Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 4d9a425201115a66b457b58d670992b279091f5a core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36871/diff/ Testing --- Thanks, Ashish Singh
[jira] [Commented] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643693#comment-14643693 ] Ashish K Singh commented on KAFKA-2381: --- Created reviewboard https://reviews.apache.org/r/36871/ against branch trunk Possible ConcurrentModificationException while unsubscribing from a topic in new consumer - Key: KAFKA-2381 URL: https://issues.apache.org/jira/browse/KAFKA-2381 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-2381.patch Possible ConcurrentModificationException while unsubscribing from a topic in new consumer. Attempt is made to modify AssignedPartitions while looping over it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2275) Add a ListTopics() API to the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643710#comment-14643710 ] Ashish K Singh commented on KAFKA-2275: --- [~guozhang] I think this is in good shape now with two +1s. Could you help with taking a final look and committing it? Add a ListTopics() API to the new consumer -- Key: KAFKA-2275 URL: https://issues.apache.org/jira/browse/KAFKA-2275 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Ashish K Singh Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-2275.patch, KAFKA-2275.patch, KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch, KAFKA-2275_2015-07-22_16:09:34.patch, KAFKA-2275_2015-07-23_09:34:41.patch With regex subscription like {code} consumer.subscribe(topic*) {code} The partition assignment is automatically done at the Kafka side, while there are some use cases where consumers want regex subscriptions but not Kafka-side partition assignment, rather with their own specific partition assignment. With ListTopics() they can periodically check for topic list changes and specifically subscribe to the partitions of the new topics. For implementation, it involves sending a TopicMetadataRequest to a random broker and parse the response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2301) Deprecate ConsumerOffsetChecker
[ https://issues.apache.org/jira/browse/KAFKA-2301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643714#comment-14643714 ] Ashish K Singh commented on KAFKA-2301: --- [~junrao], [~gwenshap] can any of you help in getting this to done. Thanks! Deprecate ConsumerOffsetChecker --- Key: KAFKA-2301 URL: https://issues.apache.org/jira/browse/KAFKA-2301 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Ashish K Singh Assignee: Ashish K Singh Fix For: 0.8.3 Attachments: KAFKA-2301.patch, KAFKA-2301_2015-07-01_17:46:34.patch, KAFKA-2301_2015-07-02_09:04:35.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36871: Patch for KAFKA-2381
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36871/#review93213 --- Ouch. Hard to believe this wasn't caught yet. core/src/test/scala/integration/kafka/api/ConsumerTest.scala (line 220) https://reviews.apache.org/r/36871/#comment147533 Could we catch this issue more directly with a unit test for SubscriptionState? - Jason Gustafson On July 28, 2015, 12:56 a.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36871/ --- (Updated July 28, 2015, 12:56 a.m.) Review request for kafka. Bugs: KAFKA-2381 https://issues.apache.org/jira/browse/KAFKA-2381 Repository: kafka Description --- KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a topic in new consumer Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 4d9a425201115a66b457b58d670992b279091f5a core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36871/diff/ Testing --- Thanks, Ashish Singh
[jira] [Updated] (KAFKA-2360) The kafka-consumer-perf-test.sh script help information print useless parameters.
[ https://issues.apache.org/jira/browse/KAFKA-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bo Wang updated KAFKA-2360: --- Description: Run kafka-consumer-perf-test.sh --help to show help information, but found 3 parameters useless : --batch-size and --batch-size --messages That is producer of parameters. bin]# ./kafka-consumer-perf-test.sh --help Missing required argument [topic] Option Description -- --- --batch-size Integer: sizeNumber of messages to write in a single batch. (default: 200) --compression-codec Integer: If set, messages are sent compressed supported codec: NoCompressionCodec (default: 0) as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3 --date-format date format The date format to use for formatting the time field. See java.text. SimpleDateFormat for options. (default: -MM-dd HH:mm:ss:SSS) --fetch-size Integer: sizeThe amount of data to fetch in a single request. (default: 1048576) --messages Long: countThe number of messages to send or consume (default: 9223372036854775807) was: Run kafka-consumer-perf-test.sh --help to show help information, but found 3 parameters useless : --batch-size and --batch-size That is producer of parameters. bin]# ./kafka-consumer-perf-test.sh --help Missing required argument [topic] Option Description -- --- --batch-size Integer: sizeNumber of messages to write in a single batch. (default: 200) --compression-codec Integer: If set, messages are sent compressed supported codec: NoCompressionCodec (default: 0) as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3 --date-format date format The date format to use for formatting the time field. See java.text. SimpleDateFormat for options. (default: -MM-dd HH:mm:ss:SSS) --fetch-size Integer: sizeThe amount of data to fetch in a single request. (default: 1048576) The kafka-consumer-perf-test.sh script help information print useless parameters. - Key: KAFKA-2360 URL: https://issues.apache.org/jira/browse/KAFKA-2360 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.2.1 Environment: Linux Reporter: Bo Wang Priority: Minor Original Estimate: 24h Remaining Estimate: 24h Run kafka-consumer-perf-test.sh --help to show help information, but found 3 parameters useless : --batch-size and --batch-size --messages That is producer of parameters. bin]# ./kafka-consumer-perf-test.sh --help Missing required argument [topic] Option Description -- --- --batch-size Integer: sizeNumber of messages to write in a single batch. (default: 200) --compression-codec Integer: If set, messages are sent compressed supported codec: NoCompressionCodec (default: 0) as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3 --date-format date format The date format to use for formatting the time field. See java.text.
[jira] [Created] (KAFKA-2380) Publish Kafka snapshot Maven artifacts
Stevo Slavic created KAFKA-2380: --- Summary: Publish Kafka snapshot Maven artifacts Key: KAFKA-2380 URL: https://issues.apache.org/jira/browse/KAFKA-2380 Project: Kafka Issue Type: Task Affects Versions: 0.8.2.1 Reporter: Stevo Slavic Priority: Minor Please have Kafka snapshot Maven artifacts published regularly (e.g. either after every successful CI job run, or after successful nightly CI job run) to [Apache snapshots repository|http://repository.apache.org/content/groups/snapshots/org/apache/kafka/]. It will be very helpful for and promote early integration efforts, of patches/fixes to issues or of brand new Kafka clients/server versions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643843#comment-14643843 ] Ashish K Singh commented on KAFKA-2381: --- Updated reviewboard https://reviews.apache.org/r/36871/ against branch trunk Possible ConcurrentModificationException while unsubscribing from a topic in new consumer - Key: KAFKA-2381 URL: https://issues.apache.org/jira/browse/KAFKA-2381 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-2381.patch, KAFKA-2381_2015-07-27_17:56:00.patch, KAFKA-2381_2015-07-27_21:56:06.patch Possible ConcurrentModificationException while unsubscribing from a topic in new consumer. Attempt is made to modify AssignedPartitions while looping over it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36871: Patch for KAFKA-2381
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36871/ --- (Updated July 28, 2015, 4:56 a.m.) Review request for kafka. Bugs: KAFKA-2381 https://issues.apache.org/jira/browse/KAFKA-2381 Repository: kafka Description (updated) --- Add a more specific unit test KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a topic in new consumer Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 4d9a425201115a66b457b58d670992b279091f5a clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java 319751c374ccdc7e7d7d74bcd01bc279b1bdb26e core/src/test/scala/integration/kafka/api/ConsumerTest.scala 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca Diff: https://reviews.apache.org/r/36871/diff/ Testing --- Thanks, Ashish Singh