Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-27 Thread Aditya Auradkar
+1 on comparison with existing solutions. On a high level, it seems nice to
have a transform library inside Kafka.. a lot of the building blocks are
already there to build a stream processing framework. However the details
are tricky to get right I think this discussion will get a lot more
interesting when we have something concrete to look at. I'm +1 for the
general idea.
How far away are we from having something a prototype patch to play with?

Couple of observations:
- Since the input source for each processor is always Kafka, you get basic
client side partition management out of the box it use the high level
consumer.
- The KIP states that cmd line tools will be provided to deploy as a
separate service. Is the proposed scope limited to providing a library with
which makes it possible build stream-processing-as- a-service or provide
such a service within Kafka itself?

Aditya

On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 Since we will be discussing KIP-28 in the call tomorrow, can you
 update the KIP with the feature-comparison with  existing solutions?
 I admit that I do not see a need for single-event-producer-consumer
 pair (AKA Flume Interceptor). I've seen tons of people implement such
 apps in the last year, and it seemed easy. Now, perhaps we were doing
 it all wrong... but I'd like to know how :)

 If we are talking about a bigger story (i.e. DSL, real
 stream-processing, etc), thats a different discussion. I've seen a
 bunch of misconceptions about SparkStreaming in this discussion, and I
 have some thoughts in that regard, but I'd rather not go into that if
 thats outside the scope of this KIP.

 Gwen


 On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com wrote:
  Hi Ewen,
 
  Replies inlined.
 
  On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava 
 e...@confluent.io
  wrote:
 
  Just some notes on the KIP doc itself:
 
  * It'd be useful to clarify at what point the plain consumer + custom
 code
  + producer breaks down. I think trivial filtering and aggregation on a
  single stream usually work fine with this model. Anything where you need
  more complex joins, windowing, etc. are where it breaks down. I think
 most
  interesting applications require that functionality, but it's helpful to
  make this really clear in the motivation -- right now, Kafka only
 provides
  the lowest level plumbing for stream processing applications, so most
  interesting apps require very heavyweight frameworks.
 
 
  I think for users to efficiently express complex logic like joins
  windowing, etc, a higher-level programming interface beyond the process()
  interface would definitely be better, but that does not necessarily
 require
  a heavyweight frameworks, which usually includes more than just the
  high-level functional programming model. I would argue that an
 alternative
  solution would better be provided for users who want some high-level
  programming interface but not a heavyweight stream processing framework
  that include the processor library plus another DSL layer on top of it.
 
 
 
  * I think the feature comparison of plain producer/consumer, stream
  processing frameworks, and this new library is a good start, but we
 might
  want something more thorough and structured, like a feature matrix.
 Right
  now it's hard to figure out exactly how they relate to each other.
 
 
  Cool, I can do that.
 
 
  * I'd personally push the library vs. framework story very strongly --
 the
  total buy-in and weak integration story of stream processing frameworks
 is
  a big downside and makes a library a really compelling (and currently
  unavailable, as far as I am aware) alternative.
 
 
  Are you suggesting there are still some content missing about the
  motivations of adding the proposed library in the wiki page?
 
 
  * Comment about in-memory storage of other frameworks is interesting --
 it
  is specific to the framework, but is supposed to also give performance
  benefits. The high-level functional processing interface would allow for
  combining multiple operations when there's no shuffle, but when there
 is a
  shuffle, we'll always be writing to Kafka, right? Spark (and presumably
  spark streaming) is supposed to get a big win by handling shuffles such
  that the data just stays in cache and never actually hits disk, or at
 least
  hits disk in the background. Will we take a hit because we always write
 to
  Kafka?
 
 
  I agree with Neha's comments here. One more point I want to make is
  materializing to Kafka is not necessarily much worse than keeping data in
  memory if the downstream consumption is caught up such that most of the
  reads will be hitting file cache. I remember Samza has illustrated that
  under such scenarios its throughput is actually quite comparable to Spark
  Streaming / Storm.
 
 
  * I really struggled with the structure of the KIP template with Copycat
  because the flow doesn't work well for proposals like 

Number of kafka topics/partitions supported per cluster of n nodes

2015-07-27 Thread Prabhjot Bharaj
Hi,

I'm looking forward to a benchmark which can explain how many total number
of topics and partitions can be created in a cluster of n nodes, given the
message size varies between x and y bytes and how does it vary with varying
heap sizes and how it affects the system performance.

e.g. the result should look like: t topics with p partitions each can be
supported in a cluster of n nodes with a heap size of h MB, before the
cluster sees things like JVM crashes or high mem usage or system slowdown
etc.

I think such benchmarks must exist so that we can make better decisions on
ops side
If these details dont exist, I'll be doing this test myself on varying the
values of parameters described above. I would be happy to share the numbers
with the community

Thanks,
prabcs


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-27 Thread Neha Narkhede
Gwen,

We have a compilation of notes from comparison with other systems. They
might be missing details that folks who worked on that system might be able
to point out. We can share that and discuss further on the KIP call.

We do hope to include a DSL since that is the most natural way of
expressing stream processing operations on top of the processor client. The
DSL layer should be equivalent to that provided by Spark streaming or Flink
in terms of expressiveness though there will be differences in
implementation. Our client is intended to be simpler, with minimum external
dependencies since it integrates closely with Kafka. This is really what
most application development is hoping to get - a lightweight library on
top of Kafka that allows them to process streams of data.

Thanks
Neha

On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 Since we will be discussing KIP-28 in the call tomorrow, can you
 update the KIP with the feature-comparison with  existing solutions?
 I admit that I do not see a need for single-event-producer-consumer
 pair (AKA Flume Interceptor). I've seen tons of people implement such
 apps in the last year, and it seemed easy. Now, perhaps we were doing
 it all wrong... but I'd like to know how :)

 If we are talking about a bigger story (i.e. DSL, real
 stream-processing, etc), thats a different discussion. I've seen a
 bunch of misconceptions about SparkStreaming in this discussion, and I
 have some thoughts in that regard, but I'd rather not go into that if
 thats outside the scope of this KIP.

 Gwen


 On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com wrote:
  Hi Ewen,
 
  Replies inlined.
 
  On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava 
 e...@confluent.io
  wrote:
 
  Just some notes on the KIP doc itself:
 
  * It'd be useful to clarify at what point the plain consumer + custom
 code
  + producer breaks down. I think trivial filtering and aggregation on a
  single stream usually work fine with this model. Anything where you need
  more complex joins, windowing, etc. are where it breaks down. I think
 most
  interesting applications require that functionality, but it's helpful to
  make this really clear in the motivation -- right now, Kafka only
 provides
  the lowest level plumbing for stream processing applications, so most
  interesting apps require very heavyweight frameworks.
 
 
  I think for users to efficiently express complex logic like joins
  windowing, etc, a higher-level programming interface beyond the process()
  interface would definitely be better, but that does not necessarily
 require
  a heavyweight frameworks, which usually includes more than just the
  high-level functional programming model. I would argue that an
 alternative
  solution would better be provided for users who want some high-level
  programming interface but not a heavyweight stream processing framework
  that include the processor library plus another DSL layer on top of it.
 
 
 
  * I think the feature comparison of plain producer/consumer, stream
  processing frameworks, and this new library is a good start, but we
 might
  want something more thorough and structured, like a feature matrix.
 Right
  now it's hard to figure out exactly how they relate to each other.
 
 
  Cool, I can do that.
 
 
  * I'd personally push the library vs. framework story very strongly --
 the
  total buy-in and weak integration story of stream processing frameworks
 is
  a big downside and makes a library a really compelling (and currently
  unavailable, as far as I am aware) alternative.
 
 
  Are you suggesting there are still some content missing about the
  motivations of adding the proposed library in the wiki page?
 
 
  * Comment about in-memory storage of other frameworks is interesting --
 it
  is specific to the framework, but is supposed to also give performance
  benefits. The high-level functional processing interface would allow for
  combining multiple operations when there's no shuffle, but when there
 is a
  shuffle, we'll always be writing to Kafka, right? Spark (and presumably
  spark streaming) is supposed to get a big win by handling shuffles such
  that the data just stays in cache and never actually hits disk, or at
 least
  hits disk in the background. Will we take a hit because we always write
 to
  Kafka?
 
 
  I agree with Neha's comments here. One more point I want to make is
  materializing to Kafka is not necessarily much worse than keeping data in
  memory if the downstream consumption is caught up such that most of the
  reads will be hitting file cache. I remember Samza has illustrated that
  under such scenarios its throughput is actually quite comparable to Spark
  Streaming / Storm.
 
 
  * I really struggled with the structure of the KIP template with Copycat
  because the flow doesn't work well for proposals like this. They aren't
 as
  concrete changes as the KIP template was designed for. I'd completely
  

Re: Review Request 33620: Patch for KAFKA-1690

2015-07-27 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review93110
---


I did an initial pass over the code (excluding tests) and left some comments. 
Mostly trivial fixes, I hope.


build.gradle (lines 247 - 249)
https://reviews.apache.org/r/33620/#comment147361

KAFKA-2348 was merged recently and if/else for 2.9 support was removed 
there. It should not be restored here.



clients/src/main/java/org/apache/kafka/clients/ClientUtils.java (lines 80 - 86)
https://reviews.apache.org/r/33620/#comment147362

Code convention: no braces for single statement.



clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (line 
106)
https://reviews.apache.org/r/33620/#comment147363

Why not pass `values` to the copy constructor of `HashMap`? Then the 
implementation is simply `return new HashMapString, Object(values);`.



clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java (line 29)
https://reviews.apache.org/r/33620/#comment147364

SSL is deprecated 
(https://ma.ttias.be/rfc-7568-ssl-3-0-is-now-officially-deprecated/), should we 
be supporting it at all? If we do support it, then we should at least include a 
warning.



clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java (line 37)
https://reviews.apache.org/r/33620/#comment147365

Should we support all ciphers by default, or should we be excluding ciphers 
that are known to be broken?



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
(lines 64 - 66)
https://reviews.apache.org/r/33620/#comment147366

Code convention: no braces used for single statement (as can be seen in the 
`if` a few lines above).



clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java 
(line 34)
https://reviews.apache.org/r/33620/#comment147368

Why not create the principal at this point instead of in the `principal()` 
method? Is it a costly operation?



clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java (lines 
35 - 39)
https://reviews.apache.org/r/33620/#comment147370

As far as I can see, these fields can be final. Also, is it intentional for 
`transportLayer` to be `public`?



clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
 (line 27)
https://reviews.apache.org/r/33620/#comment147371

Isn't `plaintext` a single word in a security context (i.e. 
https://en.wikipedia.org/wiki/Plaintext)? This also affects other classes like 
`PlainTextTransportLayer`, etc.



clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java (line 49)
https://reviews.apache.org/r/33620/#comment147382

Can be final?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 46 - 49)
https://reviews.apache.org/r/33620/#comment147383

Can some of these be final?



clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 461)
https://reviews.apache.org/r/33620/#comment147384

Should this be using `channelForId`?



core/src/main/scala/kafka/api/FetchResponse.scala (line 82)
https://reviews.apache.org/r/33620/#comment147373

Casts are to be avoided in Scala, pattern matching is a better way to do 
this:

`channel match {
  case tl: TransportLayer = pending = tl.hasPendingWrites
  case _ =
}`

However, I see that this pattern is repeated in many classes, which is not 
good. Assuming that we can't change `Send.writeTo` to take a `TransportLayer` 
(either for compatibility or because there are implementations that don't use a 
`TransportLayer`), we should consider introducing a utility method 
`hasPendingWrites(channel)` that calls `hasPendingWrites` or returns false.

What do you think?



core/src/main/scala/kafka/network/SocketServer.scala (lines 52 - 63)
https://reviews.apache.org/r/33620/#comment147375

Are all these vals needed at class scope?



core/src/main/scala/kafka/network/SocketServer.scala (line 116)
https://reviews.apache.org/r/33620/#comment147374

One typically does not specify the type of the parameter for a simple 
function, i.e.:

`requestChannel.addResponseListener(id = processors(id).wakeup())`

It's also possible to use underscore syntax to avoid declaring `id` but the 
name could aid readability in some cases.



core/src/main/scala/kafka/network/SocketServer.scala (line 232)
https://reviews.apache.org/r/33620/#comment147377

Do we really have to populate slices of the `processors` array in each 
`Acceptor`? Would it not be better to create the full array and then pass it to 
`Acceptor`? Or even perhaps pass the relevant slice.



core/src/main/scala/kafka/network/SocketServer.scala (line 501)
https://reviews.apache.org/r/33620/#comment147379

Code convention: space after `:`




Best practices - Using kafka (with http server) as source-of-truth

2015-07-27 Thread Prabhjot Bharaj
Hi Folks,

I would like to understand the best practices when using kafka as the
source-of-truth, given the fact that I want to pump in data to Kafka using
http methods.

What are the current production configurations for such a use case:-

1. Kafka-http-client - is it scalable the way Nginx is ??
2. Using Kafka and Nginx together - If anybody has used this, please explain
3. Any other scalable method ?

Regards,
prabcs


Re: Review Request 33620: Patch for KAFKA-1690

2015-07-27 Thread Ismael Juma


 On July 27, 2015, 1:32 p.m., Ismael Juma wrote:
  core/src/main/scala/kafka/api/FetchResponse.scala, line 82
  https://reviews.apache.org/r/33620/diff/13/?file=1021998#file1021998line82
 
  Casts are to be avoided in Scala, pattern matching is a better way to 
  do this:
  
  `channel match {
case tl: TransportLayer = pending = tl.hasPendingWrites
case _ =
  }`
  
  However, I see that this pattern is repeated in many classes, which is 
  not good. Assuming that we can't change `Send.writeTo` to take a 
  `TransportLayer` (either for compatibility or because there are 
  implementations that don't use a `TransportLayer`), we should consider 
  introducing a utility method `hasPendingWrites(channel)` that calls 
  `hasPendingWrites` or returns false.
  
  What do you think?
 
 Sriharsha Chintalapani wrote:
 The reason for doing this. Here Channel can be GatheringByteChannel or 
 some other socketchannel. As I mentioned in one of comments , when 
 KafkaChannel used across the system especially when we deprecate/remove 
 BlockingChannel we can go ahead and call channel.hasPendingWrites.

That makes sense. Still, having a utility method would make it easier to make 
that change when that time comes, right?


- Ismael


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review93110
---


On July 25, 2015, 7:11 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated July 25, 2015, 7:11 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Merge commit
 
 
 KAFKA-1690. Added SSL Consumer Test.
 
 
 KAFKA-1690. SSL Support.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 KAFKA-1690. added staged receives to selector.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Diffs
 -
 
   build.gradle 0abec26fb2d7be62c8a673f9ec838e926e64b2d1 
   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 bae528d31516679bed88ee61b408f209f185a8cc 
   

[jira] [Commented] (KAFKA-1682) Security for Kafka

2015-07-27 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642865#comment-14642865
 ] 

Sriharsha Chintalapani commented on KAFKA-1682:
---

[~ijuma] Here it is https://issues.apache.org/jira/browse/KAFKA-2162

 Security for Kafka
 --

 Key: KAFKA-1682
 URL: https://issues.apache.org/jira/browse/KAFKA-1682
 Project: Kafka
  Issue Type: New Feature
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps

 Parent ticket for security. Wiki and discussion is here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2162) Kafka Auditing functionality

2015-07-27 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-2162:
--
Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-1682

 Kafka Auditing functionality
 

 Key: KAFKA-2162
 URL: https://issues.apache.org/jira/browse/KAFKA-2162
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sriharsha Chintalapani
Assignee: Parth Brahmbhatt

 During Kafka authorization  discussion thread . There was concerns raised 
 about not having Auditing. Auditing is important functionality but its not 
 part of authorizer. This jira will track adding audit functionality to kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1682) Security for Kafka

2015-07-27 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642850#comment-14642850
 ] 

Ismael Juma commented on KAFKA-1682:


One of the in-scope items in the wiki page is Auditing. Is that information 
up to date and if so, is there a ticket for it? All the other items seem to be 
covered by the subtasks associated to this ticket.

 Security for Kafka
 --

 Key: KAFKA-1682
 URL: https://issues.apache.org/jira/browse/KAFKA-1682
 Project: Kafka
  Issue Type: New Feature
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps

 Parent ticket for security. Wiki and discussion is here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-1686) Implement SASL/Kerberos

2015-07-27 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-1686 started by Sriharsha Chintalapani.
-
 Implement SASL/Kerberos
 ---

 Key: KAFKA-1686
 URL: https://issues.apache.org/jira/browse/KAFKA-1686
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
 Fix For: 0.9.0


 Implement SASL/Kerberos authentication.
 To do this we will need to introduce a new SASLRequest and SASLResponse pair 
 to the client protocol. This request and response will each have only a 
 single byte[] field and will be used to handle the SASL challenge/response 
 cycle. Doing this will initialize the SaslServer instance and associate it 
 with the session in a manner similar to KAFKA-1684.
 When using integrity or encryption mechanisms with SASL we will need to wrap 
 and unwrap bytes as in KAFKA-1684 so the same interface that covers the 
 SSLEngine will need to also cover the SaslServer instance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33620: Patch for KAFKA-1690

2015-07-27 Thread Sriharsha Chintalapani


 On July 27, 2015, 1:32 p.m., Ismael Juma wrote:
  core/src/main/scala/kafka/api/FetchResponse.scala, line 82
  https://reviews.apache.org/r/33620/diff/13/?file=1021998#file1021998line82
 
  Casts are to be avoided in Scala, pattern matching is a better way to 
  do this:
  
  `channel match {
case tl: TransportLayer = pending = tl.hasPendingWrites
case _ =
  }`
  
  However, I see that this pattern is repeated in many classes, which is 
  not good. Assuming that we can't change `Send.writeTo` to take a 
  `TransportLayer` (either for compatibility or because there are 
  implementations that don't use a `TransportLayer`), we should consider 
  introducing a utility method `hasPendingWrites(channel)` that calls 
  `hasPendingWrites` or returns false.
  
  What do you think?

The reason for doing this. Here Channel can be GatheringByteChannel or some 
other socketchannel. As I mentioned in one of comments , when KafkaChannel used 
across the system especially when we deprecate/remove BlockingChannel we can go 
ahead and call channel.hasPendingWrites.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review93110
---


On July 25, 2015, 7:11 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated July 25, 2015, 7:11 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Merge commit
 
 
 KAFKA-1690. Added SSL Consumer Test.
 
 
 KAFKA-1690. SSL Support.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 KAFKA-1690. added staged receives to selector.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Diffs
 -
 
   build.gradle 0abec26fb2d7be62c8a673f9ec838e926e64b2d1 
   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 bae528d31516679bed88ee61b408f209f185a8cc 
   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
 PRE-CREATION 
   

[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client

2015-07-27 Thread Rajasekar Elango (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642821#comment-14642821
 ] 

Rajasekar Elango commented on KAFKA-1690:
-

[~harsha_ch] Is there any documentation on how to use SSL feature? E.g How to 
enable SSL on broker? how to make producer/consumer talk to broker via SSL?

 new java producer needs ssl support as a client
 ---

 Key: KAFKA-1690
 URL: https://issues.apache.org/jira/browse/KAFKA-1690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.3

 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, 
 KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, 
 KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, 
 KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, 
 KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, 
 KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, 
 KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client

2015-07-27 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642829#comment-14642829
 ] 

Sriharsha Chintalapani commented on KAFKA-1690:
---

[~erajasekar] I am putting together instructions on how to enable SSL. Will 
post the details on wiki in a day or two. 

 new java producer needs ssl support as a client
 ---

 Key: KAFKA-1690
 URL: https://issues.apache.org/jira/browse/KAFKA-1690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.3

 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, 
 KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, 
 KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, 
 KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, 
 KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, 
 KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, 
 KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1682) Security for Kafka

2015-07-27 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642868#comment-14642868
 ] 

Ismael Juma commented on KAFKA-1682:


[~harsha_ch], thanks for the link and for making KAFKA-2162 a sub-task of this 
ticket.

 Security for Kafka
 --

 Key: KAFKA-1682
 URL: https://issues.apache.org/jira/browse/KAFKA-1682
 Project: Kafka
  Issue Type: New Feature
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps

 Parent ticket for security. Wiki and discussion is here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-27 Thread Jay Kreps
Hey Yi,

Great points. I think for some of this the most useful thing would be to
get a wip prototype out that we could discuss concretely. I think Yasuhiro
and Guozhang took that prototype I had done, and had some improvements.
Give us a bit to get that into understandable shape so we can discuss.

To address a few of your other points:
1. Yeah we are going to try to generalize the partition management stuff.
We'll get a wiki/JIRA up for that. I think that gives what you want in
terms of moving partitioning to the client side.
2. I think consuming from a different cluster you produce to will be easy.
More than that is more complex, though I agree the pluggable partitioning
makes it theoretically possible. Let's try to get something that works for
the first case, it sounds like that solves the use case you describe of
wanting to directly transform from a given cluster but produce back to a
different cluster. I think the key observation is that the whole reason
LinkedIn split data over clusters to begin with was because of the lack of
quotas, which are in any case getting implemented.

-Jay

On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Jay and all,

 Thanks for all your quick responses. I tried to summarize my thoughts here:

 - ConsumerRecord as stream processor API:

* This KafkaProcessor API is targeted to receive the message from Kafka.
 So, to Yasuhiro's join/transformation example, any join/transformation
 results that are materialized in Kafka should have ConsumerRecord format
 (i.e. w/ topic and offsets). Any non-materialized join/transformation
 results should not be processed by this KafkaProcessor API. One example is
 the in-memory operators API in Samza, which is designed to handle the
 non-materialzied join/transformation results. And yes, in this case, a more
 abstract data model is needed.

* Just to support Jay's point of a general
 ConsumerRecord/ProducerRecord, a general stream processing on more than one
 data sources would need at least the following info: data source
 description (i.e. which topic/table), and actual data (i.e. key-value
 pairs). It would make sense to have the data source name as part of the
 general metadata in stream processing (think about it as the table name for
 records in standard SQL).

 - SQL/DSL

* I think that this topic itself is worthy of another KIP discussion. I
 would prefer to leave it out of scope in KIP-28.

 - Client-side pluggable partition manager

* Given the use cases we have seen with large-scale deployment of
 Samza/Kafka in LinkedIn, I would argue that we should make it as the
 first-class citizen in this KIP. The use cases include:

   * multi-cluster Kafka

   * host-affinity (i.e. local-state associated w/ certain partitions on
 client)

 - Multi-cluster scenario

* Although I originally just brought it up as a use case that requires
 client-side partition manager, reading Jay’s comments, I realized that I
 have one fundamental issue w/ the current copycat + transformation model.
 If I interpret Jay’s comment correctly, the proposed copycat+transformation
 plays out in the following way: i) copycat takes all data from sources (no
 matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii)
 transformation is only restricted to take data sources in *this single
 Kafka cluster* to perform aggregate/join etc. This is different from my
 original understanding of the copycat. The main issue I have with this
 model is: huge data-copy between Kafka clusters. In LinkedIn, we used to
 follow this model that uses MirrorMaker to map topics from tracking
 clusters to Samza-specific Kafka cluster and only do stream processing in
 the Samza-specific Kafka cluster. We moved away from this model and started
 allowing users to directly consume from tracking Kafka clusters due to the
 overhead of copying huge amount of traffic between Kafka clusters. I agree
 that the initial design of KIP-28 would probably need a smaller scope of
 problem to solve, hence, limiting to solving partition management in a
 single cluster. However, I would really hope the design won’t prevent the
 use case of processing data directly from multiple clusters. In my opinion,
 making the partition manager as a client-side pluggable logic would allow
 us to achieve these goals.

 Thanks a lot in advance!

 -Yi

 On Fri, Jul 24, 2015 at 11:13 AM, Jay Kreps j...@confluent.io wrote:

  Hey Yi,
 
  For your other two points:
 
  - This definitely doesn't cover any kind of SQL or anything like this.
 
  - The prototype we started with just had process() as a method but
 Yasuhiro
  had some ideas of adding additional filter/aggregate convenience methods.
  We should discuss how this would fit with the operator work you were
 doing
  in Samza. Probably the best way is just get the code out there in current
  state and start talking about it?
 
  - Your point about multiple clusters. We actually have a proposed
 extension
  

[jira] [Commented] (KAFKA-2365) Copycat checklist

2015-07-27 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643016#comment-14643016
 ] 

Gwen Shapira commented on KAFKA-2365:
-

BTW. Two connectors that appeared in the KIP discussion but are not in the JIRA 
are JDBC and HDFS. 
Is the idea to leave them out for now?

 Copycat checklist
 -

 Key: KAFKA-2365
 URL: https://issues.apache.org/jira/browse/KAFKA-2365
 Project: Kafka
  Issue Type: New Feature
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
  Labels: feature
 Fix For: 0.8.3


 This covers the development plan for 
 [KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767].
  There are a number of features that can be developed in sequence to make 
 incremental progress, and often in parallel:
 * Initial patch - connector API and core implementation
 * Runtime data API
 * Standalone CLI
 * REST API
 * Distributed copycat - CLI
 * Distributed copycat - coordinator
 * Distributed copycat - config storage
 * Distributed copycat - offset storage
 * Log/file connector (sample source/sink connector)
 * Elasticsearch sink connector (sample sink connector for full log - Kafka 
 - Elasticsearch sample pipeline)
 * Copycat metrics
 * System tests (including connector tests)
 * Mirrormaker connector
 * Copycat documentation
 This is an initial list, but it might need refinement to allow for more 
 incremental progress and may be missing features we find we want before the 
 initial release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2321) Introduce CONTRIBUTING.md

2015-07-27 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2321:
-
   Resolution: Fixed
Fix Version/s: 0.8.3
   Status: Resolved  (was: Patch Available)

Committed to trunk

 Introduce CONTRIBUTING.md
 -

 Key: KAFKA-2321
 URL: https://issues.apache.org/jira/browse/KAFKA-2321
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 0.8.3


 This file is displayed when people create a pull request in GitHub. It should 
 link to the relevant pages in the wiki and website.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2321) Introduce CONTRIBUTING.md

2015-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643107#comment-14643107
 ] 

ASF GitHub Bot commented on KAFKA-2321:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/97


 Introduce CONTRIBUTING.md
 -

 Key: KAFKA-2321
 URL: https://issues.apache.org/jira/browse/KAFKA-2321
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 0.8.3


 This file is displayed when people create a pull request in GitHub. It should 
 link to the relevant pages in the wiki and website.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2349) `contributing` website page should link to Contributing Code Changes wiki page

2015-07-27 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643129#comment-14643129
 ] 

Guozhang Wang commented on KAFKA-2349:
--

Committed to the repo, thanks!

 `contributing` website page should link to Contributing Code Changes wiki 
 page
 

 Key: KAFKA-2349
 URL: https://issues.apache.org/jira/browse/KAFKA-2349
 Project: Kafka
  Issue Type: Task
Reporter: Ismael Juma
Assignee: Ismael Juma
 Attachments: KAFKA-2349.patch


 This should be merged at the same time as 
 https://issues.apache.org/jira/browse/KAFKA-2321 and only after a vote takes 
 place in the mailing list.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2367) Add Copycat runtime data API

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2367:
-
Summary: Add Copycat runtime data API  (was: Add Copycat runtime API)

 Add Copycat runtime data API
 

 Key: KAFKA-2367
 URL: https://issues.apache.org/jira/browse/KAFKA-2367
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Design the API used for runtime data in Copycat. This API is used to 
 construct schemas and records that Copycat processes. This needs to be a 
 fairly general data model (think Avro, JSON, Protobufs, Thrift) in order to 
 support complex, varied data types that may be input from/output to many data 
 systems.
 This should issue should also address the serialization interfaces used 
 within Copycat, which translate the runtime data into serialized byte[] form. 
 It is important that these be considered together because the data format can 
 be used in multiple ways (records, partition IDs, partition offsets), so it 
 and the corresponding serializers must be sufficient for all these use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: New Producer and acks configuration

2015-07-27 Thread Guozhang Wang
I think there is still a subtle difference between async with acks = 0
and async with callback, that when the #.max-inflight-requests has
reached the subsequent requests cannot be sent until previous responses are
returned (which could happen, for example, when the broker is slow /
network issue happens) in the second case but not in the first.

Given this difference, I feel there are still scenarios, though probably
rare, that users would like to use acks = 0 even with new producer's
callbacks.

Guozhang

On Mon, Jul 27, 2015 at 9:25 AM, Mayuresh Gharat gharatmayures...@gmail.com
 wrote:

 So basically this means that with acks = 0, their is no guarantee that the
 message has been received by Kafka broker. I am just wondering, why would
 anyone be using acks = 0, since anyone using kafka and doing
 producer.send() would want that, their message got to kafka brokers. Also
 as Jay said, with new producer with async mode, clients will not have to
 wait for the response since it will be handled in callbacks. So the use of
 acks = 0 sounds very rare to me and I am not able to think of an usecase
 around it.

 Thanks,

 Mayuresh

 On Sun, Jul 26, 2015 at 2:40 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Aha! Yes, I was missing the part with the dummy response.
  Thank you!
 
  Gwen
 
 
  On Sun, Jul 26, 2015 at 2:17 PM, Ewen Cheslack-Postava
  e...@confluent.io wrote:
   It's different because it changes whether the client waits for the
  response
   from the broker at all. Take a look at
  NetworkClient.handleCompletedSends,
   which fills in dummy responses when a response is not expected (and
 that
   flag gets set via Sender.produceRequest using acks != 0 as a flag to
   ClientRequest). This means that the producer will invoke the callback 
   resolve the future as soon as the request hits the TCP buffer on the
   client. At that point, the behavior of the broker wrt generating a
  response
   doesn't matter -- the client isn't waiting on that response anyway.
  
   This definitely is faster since you aren't waiting for the round trip,
  but
   it seems like it is of questionable value with the new producer as Jay
   explained. It is slightly better than just assuming records have been
  sent
   as soon as you call Producer.send() in this shouldn't trigger a
 callback
   until the records have made it through the internal KafkaProducer
   buffering. But since it still has to make it through the TCP buffers it
   doesn't really guarantee anything that useful.
  
   -Ewen
  
  
   On Sun, Jul 26, 2015 at 1:40 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
   What bugs me is that even with acks = 0, the broker will append to
   local log before responding (unless I'm misreading the code), so I
   don't see why a client with acks = 0 will be any faster. Unless the
   client chooses to not wait for response, which is orthogonal to acks
   parameter.
  
   On Mon, Jul 20, 2015 at 8:52 AM, Jay Kreps j...@confluent.io wrote:
acks=0 is a one-way send, the client doesn't need to wait on the
   response.
Whether this is useful sort of depends on the client implementation.
  The
new java producer does all sends async so waiting on a response
  isn't
really a thing. For a client that lacks this, though, as some of
 them
  do,
acks=0 will be a lot faster.
   
It also makes some sense in terms of what is completed when the
  request
   is
considered satisfied
  acks = 0 - message is written to the network (buffer)
  acks = 1 - message is written to the leader log
  acks = -1 - message is committed
   
-Jay
   
On Sat, Jul 18, 2015 at 10:50 PM, Gwen Shapira 
 gshap...@cloudera.com
  
wrote:
   
Hi,
   
I was looking into the different between acks = 0 and acks = 1 in
 the
new producer, and was a bit surprised at what I found.
   
Basically, if I understand correctly, the only difference is that
  with
acks = 0, if the leader fails to append locally, it closes the
  network
connection silently and with acks = 1, it sends an actual error
message.
   
Which seems to mean that with acks = 0, any failed produce will
 lead
to metadata refresh and a retry (because network error), while
 acks =
1 will lead to either retries or abort, depending on the error.
   
Not only this doesn't match the documentation, it doesn't even make
much sense...
acks = 0 was supposed to somehow makes things less safe but
faster, and it doesn't seem to be doing that any more. I'm not
 even
sure there's any case where the acks = 0 behavior is desirable.
   
Is it my misunderstanding, or did we somehow screw up the logic
 here?
   
Gwen
   
  
  
  
  
   --
   Thanks,
   Ewen
 



 --
 -Regards,
 Mayuresh R. Gharat
 (862) 250-7125




-- 
-- Guozhang


[jira] [Updated] (KAFKA-2368) Add Copycat standalone CLI

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2368:
-
Component/s: copycat

 Add Copycat standalone CLI
 --

 Key: KAFKA-2368
 URL: https://issues.apache.org/jira/browse/KAFKA-2368
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Define and add the CLI for running Copycat in standalone mode. KAFKA-2366 may 
 add a simple version, but this JIRA should address agreeing on the final 
 interface for the functionality defined in KIP-26, which proposed a basic API 
 but may need further extension.
 Since the standalone mode is not meant to be persistent, this may end up 
 being a pretty straightforward API. However, it should cleanly handle both 
 worker-level configs and multiple connector configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2366) Initial patch for Copycat

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2366:
-
Component/s: copycat

 Initial patch for Copycat
 -

 Key: KAFKA-2366
 URL: https://issues.apache.org/jira/browse/KAFKA-2366
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 This covers the initial patch for Copycat. The goal here is to get some 
 baseline code in place, not necessarily the finalized implementation.
 The key thing we'll want here is the connector/task API, which defines how 
 third parties write connectors.
 Beyond that the goal is to have a basically functional standalone Copycat 
 implementation -- enough that we can run and test any connector code with 
 reasonable coverage of functionality; specifically, it's important that core 
 concepts like offset commit and resuming connector tasks function properly. 
 These two things obviously interact, so development of the standalone worker 
 may affect the design of connector APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2376) Add Copycat metrics

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2376:
-
Component/s: copycat

 Add Copycat metrics
 ---

 Key: KAFKA-2376
 URL: https://issues.apache.org/jira/browse/KAFKA-2376
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Copycat needs good metrics for monitoring since that will be the primary 
 insight into the health of connectors as they copy data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2373) Copycat distributed offset storage

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2373:
-
Component/s: copycat

 Copycat distributed offset storage
 --

 Key: KAFKA-2373
 URL: https://issues.apache.org/jira/browse/KAFKA-2373
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Add offset storage for Copycat that works in distributed mode, which likely 
 means storing the data in a Kafka topic. Copycat workers will use this by 
 default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: New Producer and acks configuration

2015-07-27 Thread Mayuresh Gharat
So basically this means that with acks = 0, their is no guarantee that the
message has been received by Kafka broker. I am just wondering, why would
anyone be using acks = 0, since anyone using kafka and doing
producer.send() would want that, their message got to kafka brokers. Also
as Jay said, with new producer with async mode, clients will not have to
wait for the response since it will be handled in callbacks. So the use of
acks = 0 sounds very rare to me and I am not able to think of an usecase
around it.

Thanks,

Mayuresh

On Sun, Jul 26, 2015 at 2:40 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Aha! Yes, I was missing the part with the dummy response.
 Thank you!

 Gwen


 On Sun, Jul 26, 2015 at 2:17 PM, Ewen Cheslack-Postava
 e...@confluent.io wrote:
  It's different because it changes whether the client waits for the
 response
  from the broker at all. Take a look at
 NetworkClient.handleCompletedSends,
  which fills in dummy responses when a response is not expected (and that
  flag gets set via Sender.produceRequest using acks != 0 as a flag to
  ClientRequest). This means that the producer will invoke the callback 
  resolve the future as soon as the request hits the TCP buffer on the
  client. At that point, the behavior of the broker wrt generating a
 response
  doesn't matter -- the client isn't waiting on that response anyway.
 
  This definitely is faster since you aren't waiting for the round trip,
 but
  it seems like it is of questionable value with the new producer as Jay
  explained. It is slightly better than just assuming records have been
 sent
  as soon as you call Producer.send() in this shouldn't trigger a callback
  until the records have made it through the internal KafkaProducer
  buffering. But since it still has to make it through the TCP buffers it
  doesn't really guarantee anything that useful.
 
  -Ewen
 
 
  On Sun, Jul 26, 2015 at 1:40 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  What bugs me is that even with acks = 0, the broker will append to
  local log before responding (unless I'm misreading the code), so I
  don't see why a client with acks = 0 will be any faster. Unless the
  client chooses to not wait for response, which is orthogonal to acks
  parameter.
 
  On Mon, Jul 20, 2015 at 8:52 AM, Jay Kreps j...@confluent.io wrote:
   acks=0 is a one-way send, the client doesn't need to wait on the
  response.
   Whether this is useful sort of depends on the client implementation.
 The
   new java producer does all sends async so waiting on a response
 isn't
   really a thing. For a client that lacks this, though, as some of them
 do,
   acks=0 will be a lot faster.
  
   It also makes some sense in terms of what is completed when the
 request
  is
   considered satisfied
 acks = 0 - message is written to the network (buffer)
 acks = 1 - message is written to the leader log
 acks = -1 - message is committed
  
   -Jay
  
   On Sat, Jul 18, 2015 at 10:50 PM, Gwen Shapira gshap...@cloudera.com
 
   wrote:
  
   Hi,
  
   I was looking into the different between acks = 0 and acks = 1 in the
   new producer, and was a bit surprised at what I found.
  
   Basically, if I understand correctly, the only difference is that
 with
   acks = 0, if the leader fails to append locally, it closes the
 network
   connection silently and with acks = 1, it sends an actual error
   message.
  
   Which seems to mean that with acks = 0, any failed produce will lead
   to metadata refresh and a retry (because network error), while acks =
   1 will lead to either retries or abort, depending on the error.
  
   Not only this doesn't match the documentation, it doesn't even make
   much sense...
   acks = 0 was supposed to somehow makes things less safe but
   faster, and it doesn't seem to be doing that any more. I'm not even
   sure there's any case where the acks = 0 behavior is desirable.
  
   Is it my misunderstanding, or did we somehow screw up the logic here?
  
   Gwen
  
 
 
 
 
  --
  Thanks,
  Ewen




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: New Producer and acks configuration

2015-07-27 Thread Gwen Shapira
Yeah, using acks=0 should result in higher throughput since we are not
limited by the roundtrip time to the broker.

Btw. regarding in-flight requests: With acks = 1 (or -1), can we send
a message batch to a partition before the brokers acked a previous
request? Doesn't it risk getting messages out of order?

On Mon, Jul 27, 2015 at 9:41 AM, Guozhang Wang wangg...@gmail.com wrote:
 I think there is still a subtle difference between async with acks = 0
 and async with callback, that when the #.max-inflight-requests has
 reached the subsequent requests cannot be sent until previous responses are
 returned (which could happen, for example, when the broker is slow /
 network issue happens) in the second case but not in the first.

 Given this difference, I feel there are still scenarios, though probably
 rare, that users would like to use acks = 0 even with new producer's
 callbacks.

 Guozhang

 On Mon, Jul 27, 2015 at 9:25 AM, Mayuresh Gharat gharatmayures...@gmail.com
 wrote:

 So basically this means that with acks = 0, their is no guarantee that the
 message has been received by Kafka broker. I am just wondering, why would
 anyone be using acks = 0, since anyone using kafka and doing
 producer.send() would want that, their message got to kafka brokers. Also
 as Jay said, with new producer with async mode, clients will not have to
 wait for the response since it will be handled in callbacks. So the use of
 acks = 0 sounds very rare to me and I am not able to think of an usecase
 around it.

 Thanks,

 Mayuresh

 On Sun, Jul 26, 2015 at 2:40 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Aha! Yes, I was missing the part with the dummy response.
  Thank you!
 
  Gwen
 
 
  On Sun, Jul 26, 2015 at 2:17 PM, Ewen Cheslack-Postava
  e...@confluent.io wrote:
   It's different because it changes whether the client waits for the
  response
   from the broker at all. Take a look at
  NetworkClient.handleCompletedSends,
   which fills in dummy responses when a response is not expected (and
 that
   flag gets set via Sender.produceRequest using acks != 0 as a flag to
   ClientRequest). This means that the producer will invoke the callback 
   resolve the future as soon as the request hits the TCP buffer on the
   client. At that point, the behavior of the broker wrt generating a
  response
   doesn't matter -- the client isn't waiting on that response anyway.
  
   This definitely is faster since you aren't waiting for the round trip,
  but
   it seems like it is of questionable value with the new producer as Jay
   explained. It is slightly better than just assuming records have been
  sent
   as soon as you call Producer.send() in this shouldn't trigger a
 callback
   until the records have made it through the internal KafkaProducer
   buffering. But since it still has to make it through the TCP buffers it
   doesn't really guarantee anything that useful.
  
   -Ewen
  
  
   On Sun, Jul 26, 2015 at 1:40 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
   What bugs me is that even with acks = 0, the broker will append to
   local log before responding (unless I'm misreading the code), so I
   don't see why a client with acks = 0 will be any faster. Unless the
   client chooses to not wait for response, which is orthogonal to acks
   parameter.
  
   On Mon, Jul 20, 2015 at 8:52 AM, Jay Kreps j...@confluent.io wrote:
acks=0 is a one-way send, the client doesn't need to wait on the
   response.
Whether this is useful sort of depends on the client implementation.
  The
new java producer does all sends async so waiting on a response
  isn't
really a thing. For a client that lacks this, though, as some of
 them
  do,
acks=0 will be a lot faster.
   
It also makes some sense in terms of what is completed when the
  request
   is
considered satisfied
  acks = 0 - message is written to the network (buffer)
  acks = 1 - message is written to the leader log
  acks = -1 - message is committed
   
-Jay
   
On Sat, Jul 18, 2015 at 10:50 PM, Gwen Shapira 
 gshap...@cloudera.com
  
wrote:
   
Hi,
   
I was looking into the different between acks = 0 and acks = 1 in
 the
new producer, and was a bit surprised at what I found.
   
Basically, if I understand correctly, the only difference is that
  with
acks = 0, if the leader fails to append locally, it closes the
  network
connection silently and with acks = 1, it sends an actual error
message.
   
Which seems to mean that with acks = 0, any failed produce will
 lead
to metadata refresh and a retry (because network error), while
 acks =
1 will lead to either retries or abort, depending on the error.
   
Not only this doesn't match the documentation, it doesn't even make
much sense...
acks = 0 was supposed to somehow makes things less safe but
faster, and it doesn't seem to be doing that any more. I'm not
 even
sure there's any case where the acks = 0 behavior is 

[jira] [Updated] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-2350:
---
Description: 
There are some use cases in stream processing where it is helpful to be able to 
pause consumption of a topic. For example, when joining two topics, you may 
need to delay processing of one topic while you wait for the consumer of the 
other topic to catch up. The new consumer currently doesn't provide a nice way 
to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance 
will be triggered and your partitions will be reassigned to another consumer. 
The desired behavior is instead that you keep the partition assigned and simply 

One way to achieve this would be to add two new methods to KafkaConsumer:

{code}
void pause(TopicPartition... partitions);
void resume(TopicPartition... partitions);
{code}

Here is the expected behavior of pause/resume:

* When a partition is paused, calls to KafkaConsumer.poll will not initiate any 
new fetches for that partition.
* After the partition is resumed, fetches will begin again. 
* While a partition is paused, seek() and position() can still be used to 
advance or query the current position.
* Rebalance does not preserve pause/resume state.



  was:
There are some use cases in stream processing where it is helpful to be able to 
pause consumption of a topic. For example, when joining two topics, you may 
need to delay processing of one topic while you wait for the consumer of the 
other topic to catch up. The new consumer currently doesn't provide a nice way 
to do this. If you skip poll() or if you unsubscribe, then a rebalance will be 
triggered and your partitions will be reassigned.

One way to achieve this would be to add two new methods to KafkaConsumer:

{code}
void pause(String... topics);
void unpause(String... topics);
{code}

When a topic is paused, a call to KafkaConsumer.poll will not initiate any new 
fetches for that topic. After it is unpaused, fetches will begin again.


 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another consumer. The desired behavior is instead that you keep the partition 
 assigned and simply 
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(TopicPartition... partitions);
 void resume(TopicPartition... partitions);
 {code}
 Here is the expected behavior of pause/resume:
 * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
 any new fetches for that partition.
 * After the partition is resumed, fetches will begin again. 
 * While a partition is paused, seek() and position() can still be used to 
 advance or query the current position.
 * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2365) Copycat checklist

2015-07-27 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643087#comment-14643087
 ] 

Neha Narkhede commented on KAFKA-2365:
--

Worth discussing a process for including a connector in Kafka core, but I think 
we went through this in the KIP discussion and here is what I think. To keep 
packaging, review and code management easier, it is better to just include a 
couple lightweight connectors enough to show the usage of the copypcat APIs 
(file in/file out). Any connector that requires depending on an external system 
(HDFS, Elasticsearch) should really live elsewhere. We should also delete the 
ones under contrib, they never ended up getting supported by the community. 

Since there will always be connectors that need to live outside Kafka, I think 
we should instead focus on how to make tooling easier for discovering and using 
these federated connectors. 

 Copycat checklist
 -

 Key: KAFKA-2365
 URL: https://issues.apache.org/jira/browse/KAFKA-2365
 Project: Kafka
  Issue Type: New Feature
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
  Labels: feature
 Fix For: 0.8.3


 This covers the development plan for 
 [KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767].
  There are a number of features that can be developed in sequence to make 
 incremental progress, and often in parallel:
 * Initial patch - connector API and core implementation
 * Runtime data API
 * Standalone CLI
 * REST API
 * Distributed copycat - CLI
 * Distributed copycat - coordinator
 * Distributed copycat - config storage
 * Distributed copycat - offset storage
 * Log/file connector (sample source/sink connector)
 * Elasticsearch sink connector (sample sink connector for full log - Kafka 
 - Elasticsearch sample pipeline)
 * Copycat metrics
 * System tests (including connector tests)
 * Mirrormaker connector
 * Copycat documentation
 This is an initial list, but it might need refinement to allow for more 
 incremental progress and may be missing features we find we want before the 
 initial release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2349) `contributing` website page should link to Contributing Code Changes wiki page

2015-07-27 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2349:
-
   Resolution: Fixed
Fix Version/s: 0.8.3
   Status: Resolved  (was: Patch Available)

 `contributing` website page should link to Contributing Code Changes wiki 
 page
 

 Key: KAFKA-2349
 URL: https://issues.apache.org/jira/browse/KAFKA-2349
 Project: Kafka
  Issue Type: Task
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 0.8.3

 Attachments: KAFKA-2349.patch


 This should be merged at the same time as 
 https://issues.apache.org/jira/browse/KAFKA-2321 and only after a vote takes 
 place in the mailing list.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2365) Copycat checklist

2015-07-27 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643014#comment-14643014
 ] 

Gwen Shapira commented on KAFKA-2365:
-

I added a component, added you as component lead and made sure all new CopyCat 
jiras will automatically assign to you.

If you object to the auto-assignment, let me know - I figured that at this 
stage it will save you a click of assigning everything to you... 

 Copycat checklist
 -

 Key: KAFKA-2365
 URL: https://issues.apache.org/jira/browse/KAFKA-2365
 Project: Kafka
  Issue Type: New Feature
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
  Labels: feature
 Fix For: 0.8.3


 This covers the development plan for 
 [KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767].
  There are a number of features that can be developed in sequence to make 
 incremental progress, and often in parallel:
 * Initial patch - connector API and core implementation
 * Runtime data API
 * Standalone CLI
 * REST API
 * Distributed copycat - CLI
 * Distributed copycat - coordinator
 * Distributed copycat - config storage
 * Distributed copycat - offset storage
 * Log/file connector (sample source/sink connector)
 * Elasticsearch sink connector (sample sink connector for full log - Kafka 
 - Elasticsearch sample pipeline)
 * Copycat metrics
 * System tests (including connector tests)
 * Mirrormaker connector
 * Copycat documentation
 This is an initial list, but it might need refinement to allow for more 
 incremental progress and may be missing features we find we want before the 
 initial release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2371) Add distributed coordinator implementation for Copycat

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2371:
-
Component/s: copycat

 Add distributed coordinator implementation for Copycat
 --

 Key: KAFKA-2371
 URL: https://issues.apache.org/jira/browse/KAFKA-2371
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Copycat needs a Coordinator implementation that handles multiple Workers that 
 automatically manage the distribution of connectors and tasks across them. To 
 start, this implementation should handle any connectors that have been 
 registered via either a CLI or REST interface for starting/stopping 
 connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2375) Implement elasticsearch Copycat sink connector

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2375:
-
Component/s: copycat

 Implement elasticsearch Copycat sink connector
 --

 Key: KAFKA-2375
 URL: https://issues.apache.org/jira/browse/KAFKA-2375
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Implement an elasticsearch sink connector for Copycat. This should send 
 records to elasticsearch with unique document IDs, given appropriate configs 
 to extract IDs from input records.
 The motivation here is to provide a good end-to-end example with built-in 
 connectors that require minimal dependencies. Because Elasticsearch has a 
 very simple REST API, an elasticsearch connector shouldn't require any extra 
 dependencies and logs - Elasticsearch (in combination with KAFKA-2374) 
 provides a compelling out-of-the-box Copycat use case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2369) Add Copycat REST API

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2369:
-
Component/s: copycat

 Add Copycat REST API
 

 Key: KAFKA-2369
 URL: https://issues.apache.org/jira/browse/KAFKA-2369
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Add a REST API for Copycat. At a minimum, for a single worker this should 
 support:
 * add/remove connector
 * connector status
 * task status
 * worker status
 In distributed mode this should handle forwarding if necessary, but it may 
 make sense to defer the distributed support for a later JIRA.
 This will require the addition of new dependencies to support implementing 
 the REST API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2374) Implement Copycat log/file connector

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2374:
-
Component/s: copycat

 Implement Copycat log/file connector
 

 Key: KAFKA-2374
 URL: https://issues.apache.org/jira/browse/KAFKA-2374
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 This is a good baseline connector that has zero dependencies and works well 
 as both a demonstration and a practical use case for standalone mode.
 Two key features it should ideally support: support multiple files and 
 rolling log files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2370) Add pause/unpause connector support

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2370:
-
Component/s: copycat

 Add pause/unpause connector support
 ---

 Key: KAFKA-2370
 URL: https://issues.apache.org/jira/browse/KAFKA-2370
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 It will sometimes be useful to pause/unpause connectors. For example, if you 
 know planned maintenance will occur on the source/destination system, it 
 would make sense to pause and then resume (but not delete and then restore), 
 a connector.
 This likely requires support in all Coordinator implementations 
 (standalone/distributed) to trigger the events.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2379) Add Copycat documentation

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2379:
-
Component/s: copycat

 Add Copycat documentation
 -

 Key: KAFKA-2379
 URL: https://issues.apache.org/jira/browse/KAFKA-2379
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Starting this out pretty broad as it can cover a lot. Some ideas:
 * Normal intro/readme type stuff
 * User guide - how to run in standalone/distributed mode. Connector/tasks 
 concepts and what they mean in practice. Fault tolerance  offsets. REST 
 interface, Copycat as a service, etc.
 * Dev guide - connectors/partitions/records/offsets/tasks. All the APIs, 
 specific examples for implementing APIs, resuming from previous offsets, 
 dynamic sets of partitions, how to work with the runtime data API, etc.
 * System design - KIP-26 + more - why we ended up on the design we did, 
 comparisons to other systems w/ low level details, 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2378) Add Copycat embedded API

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2378:
-
Component/s: copycat

 Add Copycat embedded API
 

 Key: KAFKA-2378
 URL: https://issues.apache.org/jira/browse/KAFKA-2378
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Much of the required Copycat API will exist from previous patches since any 
 main() method will need to do very similar operations. However, integrating 
 with any other Java code may require additional API support.
 For example, one of the use cases when integrating with any stream processing 
 application will require knowing which topics will be written to. We will 
 need to add APIs to expose the topics a registered connector is writing to so 
 they can be consumed by a stream processing task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2377) Add copycat system tests

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2377:
-
Component/s: copycat

 Add copycat system tests
 

 Key: KAFKA-2377
 URL: https://issues.apache.org/jira/browse/KAFKA-2377
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Add baseline system tests for Copycat, covering both standalone and 
 distributed mode.
 This should cover basic failure modes and verify at-least-one delivery of 
 data, both from source system - Kafka and Kafka - sink system. This, of 
 course, requires testing the core, built-in connectors provided with Copycat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2372) Copycat distributed config storage

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2372:
-
Component/s: copycat

 Copycat distributed config storage
 --

 Key: KAFKA-2372
 URL: https://issues.apache.org/jira/browse/KAFKA-2372
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Add a config storage mechanism to Copycat that works in distributed mode. 
 Copycat workers that start in distributed mode should use this implementation 
 by default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2321; Introduce CONTRIBUTING.md

2015-07-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/97


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-2365) Copycat checklist

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2365:
-
Component/s: copycat

 Copycat checklist
 -

 Key: KAFKA-2365
 URL: https://issues.apache.org/jira/browse/KAFKA-2365
 Project: Kafka
  Issue Type: New Feature
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
  Labels: feature
 Fix For: 0.8.3


 This covers the development plan for 
 [KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767].
  There are a number of features that can be developed in sequence to make 
 incremental progress, and often in parallel:
 * Initial patch - connector API and core implementation
 * Runtime data API
 * Standalone CLI
 * REST API
 * Distributed copycat - CLI
 * Distributed copycat - coordinator
 * Distributed copycat - config storage
 * Distributed copycat - offset storage
 * Log/file connector (sample source/sink connector)
 * Elasticsearch sink connector (sample sink connector for full log - Kafka 
 - Elasticsearch sample pipeline)
 * Copycat metrics
 * System tests (including connector tests)
 * Mirrormaker connector
 * Copycat documentation
 This is an initial list, but it might need refinement to allow for more 
 incremental progress and may be missing features we find we want before the 
 initial release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2367) Add Copycat runtime data API

2015-07-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2367:
-
Component/s: copycat

 Add Copycat runtime data API
 

 Key: KAFKA-2367
 URL: https://issues.apache.org/jira/browse/KAFKA-2367
 Project: Kafka
  Issue Type: Sub-task
  Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


 Design the API used for runtime data in Copycat. This API is used to 
 construct schemas and records that Copycat processes. This needs to be a 
 fairly general data model (think Avro, JSON, Protobufs, Thrift) in order to 
 support complex, varied data types that may be input from/output to many data 
 systems.
 This should issue should also address the serialization interfaces used 
 within Copycat, which translate the runtime data into serialized byte[] form. 
 It is important that these be considered together because the data format can 
 be used in multiple ways (records, partition IDs, partition offsets), so it 
 and the corresponding serializers must be sufficient for all these use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-27 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643065#comment-14643065
 ] 

Mayuresh Gharat commented on KAFKA-2260:


I think, when 2 producers are trying to produce concurrently, the broker log 
will be appended in a specific order, which means the corresponding offsets in 
the respective sub-partitions will also be incremented in that order and the 
client should be able to figure out from ProduceResponse what is the next 
offset it needs to send the data to. This is what I understand from the above 
discussion. 

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Best practices - Using kafka (with http server) as source-of-truth

2015-07-27 Thread Ewen Cheslack-Postava
Hi Prabhjot,

Confluent has a REST proxy with docs that may give some guidance:
http://docs.confluent.io/1.0/kafka-rest/docs/intro.html The new producer
that it uses is very efficient, so you should be able to get pretty good
throughput. You take a bit of a hit due to the overhead of sending data
through a proxy, but with appropriate batching you can get about 2/3 the
performance as you would get using the Java producer directly.

There are also a few other proxies you can find here:
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST

You can also put nginx (or HAProxy, or a variety of other solutions) in
front of REST proxies for load balancing, HA, SSL termination, etc. This is
yet another hop, so it might affect throughput and latency.

-Ewen

On Mon, Jul 27, 2015 at 6:55 AM, Prabhjot Bharaj prabhbha...@gmail.com
wrote:

 Hi Folks,

 I would like to understand the best practices when using kafka as the
 source-of-truth, given the fact that I want to pump in data to Kafka using
 http methods.

 What are the current production configurations for such a use case:-

 1. Kafka-http-client - is it scalable the way Nginx is ??
 2. Using Kafka and Nginx together - If anybody has used this, please
 explain
 3. Any other scalable method ?

 Regards,
 prabcs




-- 
Thanks,
Ewen


Re: New Producer and acks configuration

2015-07-27 Thread Ewen Cheslack-Postava
If only we had some sort of system test framework with a producer
performance test that we could parameterize with the different acks
settings to validate these performance differences...

wrt out of order: yes, with  1 in flight requests with retries, messages
can get out of order. Becket had a great presentation addressing that and a
bunch of other issues with no data loss pipelines:
http://www.slideshare.net/JiangjieQin/no-data-loss-pipeline-with-apache-kafka-49753844
Short version: as things are today, you have to *really* understand the
producer settings, and some producer internals, to get the exact behavior
you want.


On Mon, Jul 27, 2015 at 9:44 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Yeah, using acks=0 should result in higher throughput since we are not
 limited by the roundtrip time to the broker.

 Btw. regarding in-flight requests: With acks = 1 (or -1), can we send
 a message batch to a partition before the brokers acked a previous
 request? Doesn't it risk getting messages out of order?

 On Mon, Jul 27, 2015 at 9:41 AM, Guozhang Wang wangg...@gmail.com wrote:
  I think there is still a subtle difference between async with acks = 0
  and async with callback, that when the #.max-inflight-requests has
  reached the subsequent requests cannot be sent until previous responses
 are
  returned (which could happen, for example, when the broker is slow /
  network issue happens) in the second case but not in the first.
 
  Given this difference, I feel there are still scenarios, though probably
  rare, that users would like to use acks = 0 even with new producer's
  callbacks.
 
  Guozhang
 
  On Mon, Jul 27, 2015 at 9:25 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com
  wrote:
 
  So basically this means that with acks = 0, their is no guarantee that
 the
  message has been received by Kafka broker. I am just wondering, why
 would
  anyone be using acks = 0, since anyone using kafka and doing
  producer.send() would want that, their message got to kafka brokers.
 Also
  as Jay said, with new producer with async mode, clients will not have to
  wait for the response since it will be handled in callbacks. So the use
 of
  acks = 0 sounds very rare to me and I am not able to think of an usecase
  around it.
 
  Thanks,
 
  Mayuresh
 
  On Sun, Jul 26, 2015 at 2:40 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Aha! Yes, I was missing the part with the dummy response.
   Thank you!
  
   Gwen
  
  
   On Sun, Jul 26, 2015 at 2:17 PM, Ewen Cheslack-Postava
   e...@confluent.io wrote:
It's different because it changes whether the client waits for the
   response
from the broker at all. Take a look at
   NetworkClient.handleCompletedSends,
which fills in dummy responses when a response is not expected (and
  that
flag gets set via Sender.produceRequest using acks != 0 as a flag to
ClientRequest). This means that the producer will invoke the
 callback 
resolve the future as soon as the request hits the TCP buffer on the
client. At that point, the behavior of the broker wrt generating a
   response
doesn't matter -- the client isn't waiting on that response anyway.
   
This definitely is faster since you aren't waiting for the round
 trip,
   but
it seems like it is of questionable value with the new producer as
 Jay
explained. It is slightly better than just assuming records have
 been
   sent
as soon as you call Producer.send() in this shouldn't trigger a
  callback
until the records have made it through the internal KafkaProducer
buffering. But since it still has to make it through the TCP
 buffers it
doesn't really guarantee anything that useful.
   
-Ewen
   
   
On Sun, Jul 26, 2015 at 1:40 PM, Gwen Shapira 
 gshap...@cloudera.com
   wrote:
   
What bugs me is that even with acks = 0, the broker will append to
local log before responding (unless I'm misreading the code), so I
don't see why a client with acks = 0 will be any faster. Unless the
client chooses to not wait for response, which is orthogonal to
 acks
parameter.
   
On Mon, Jul 20, 2015 at 8:52 AM, Jay Kreps j...@confluent.io
 wrote:
 acks=0 is a one-way send, the client doesn't need to wait on the
response.
 Whether this is useful sort of depends on the client
 implementation.
   The
 new java producer does all sends async so waiting on a response
   isn't
 really a thing. For a client that lacks this, though, as some of
  them
   do,
 acks=0 will be a lot faster.

 It also makes some sense in terms of what is completed when the
   request
is
 considered satisfied
   acks = 0 - message is written to the network (buffer)
   acks = 1 - message is written to the leader log
   acks = -1 - message is committed

 -Jay

 On Sat, Jul 18, 2015 at 10:50 PM, Gwen Shapira 
  gshap...@cloudera.com
   
 wrote:

 Hi,

 I was looking into the different between 

[jira] [Commented] (KAFKA-2303) Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction failures

2015-07-27 Thread Alexander Demidko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643136#comment-14643136
 ] 

Alexander Demidko commented on KAFKA-2303:
--

I think in our case we had too many unique keys per partition, so making 
compactions to happen more frequently will not ultimately solve the issue.
Increasing partitions number should help, but it requires more careful planning 
about the compacted topic overall data volumes.

 Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction 
 failures
 

 Key: KAFKA-2303
 URL: https://issues.apache.org/jira/browse/KAFKA-2303
 Project: Kafka
  Issue Type: Bug
  Components: core, log
Affects Versions: 0.8.2.1
Reporter: Alexander Demidko
Assignee: Jay Kreps
 Fix For: 0.8.3


 We have rolled out the patch for KAFKA-2235 to our kafka cluster, and 
 recently instead of 
 {code}
 kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
 java.lang.IllegalArgumentException: requirement failed: Attempt to add a new 
 entry to a full offset map. 
 {code}
 we started to see 
 {code}
 kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
 java.lang.IllegalArgumentException: requirement failed: 131390902 messages in 
 segment topic-name-cgstate-8/79840768.log but offset map can 
 fit only 80530612. You can increase log.cleaner.dedupe.buffer.size or 
 decrease log.cleaner.threads
 {code}
 So, we had to roll it back to avoid disk depletion although I'm not sure if 
 it needs to be rolled back in trunk. This patch applies more strict checks 
 than were in place before: even if there is only one unique key for a 
 segment, cleanup will fail if this segment is too big. 
 Does it make sense to eliminate a limit for the offset map slots count, for 
 example to use an offset map backed by a memory mapped file?
 The limit of 80530612 slots comes from memory / bytesPerEntry, where memory 
 is Int.MaxValue (we use only one cleaner thread) and bytesPerEntry is 8 + 
 digest hash size. Might be wrong, but it seems if the overall number of 
 unique keys per partition is more than 80M slots in an OffsetMap, compaction 
 will always fail and cleaner thread will die. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Partitioning in Kafka

2015-07-27 Thread Gwen Shapira
If you are used to map-reduce patterns, this sounds like a perfectly
natural way to process streams of data.

Call the first consumer map-combine-log, the topic shuffle-log and
the second consumer reduce-log :)
I like that a lot. It works well for either embarrassingly parallel
cases, or so much data that more parallelism is worth the extra
overhead cases.

I personally don't care if its in core-Kafka, KIP-28 or a github
project elsewhere, but I find it useful and non-esoteric.



On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote:
 For a little background, the difference between this partitioner and the
 default one is that it breaks the deterministic mapping from key to
 partition. Instead, messages for a given key can end up in either of two
 partitions. This means that the consumer generally won't see all messages
 for a given key. Instead the consumer would compute an aggregate for each
 key on the partitions it consumes and write them to a separate topic. For
 example, if you are writing log messages to a logs topic with the
 hostname as the key, you could this partitioning strategy to compute
 message counts for each host in each partition and write them to a
 log-counts topic. Then a consumer of the log-counts topic would compute
 total aggregates based on the two intermediate aggregates. The benefit is
 that you are generally going to get better load balancing across partitions
 than if you used the default partitioner. (Please correct me if my
 understanding is incorrect, Gianmarco)

 So I think the question is whether this is a useful primitive for Kafka to
 provide out of the box? I was a little concerned that this use case is a
 little esoteric for a core feature, but it may make more sense in the
 context of KIP-28 which would provide some higher-level processing
 capabilities (though it doesn't seem like the KStream abstraction would
 provide a direct way to leverage this partitioner without custom logic).

 Thanks,
 Jason


 On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales 
 g...@apache.org wrote:

 Hello folks,

 I'd like to ask the community about its opinion on the partitioning
 functions in Kafka.

 With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
 integrated we are now able to have custom partitioners in the producer.
 The question now becomes *which* partitioners should ship with Kafka?
 This issue arose in the context of KAFKA-2092
 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
 specific load-balanced partitioning. This partitioner however assumes some
 stages of processing on top of it to make proper use of the data, i.e., it
 envisions Kafka as a substrate for stream processing, and not only as the
 I/O component.
 Is this a direction that Kafka wants to go towards? Or is this a role
 better left to the internal communication systems of other stream
 processing engines (e.g., Storm)?
 And if the answer is the latter, how would something such a Samza (which
 relies mostly on Kafka as its communication substrate) be able to implement
 advanced partitioning schemes?

 Cheers,
 --
 Gianmarco



Jenkins build is back to normal : KafkaPreCommit #164

2015-07-27 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/164/changes



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643249#comment-14643249
 ] 

Guozhang Wang commented on KAFKA-2350:
--

[~becket_qin], I was not considering the implementation from a developer point 
of view regarding call trace, but rather from a user point of view. That is, 
with two different names it is more clear about the distinction between topic / 
partition subscriptions. For another example, say you write some applications 
with a consumer client embedded, and very likely call its function from many 
places in your app code / classes. When you saw an exception thrown from 
unsubscribe(partition), you need to possibly look at other places and check 
if it is case 1) you used subscribe(topic), but it is not assigned from Kafka, 
2) you used subscribe(partition) on some other partitions, but you did not 
subscribe this before. Similarly, if you saw an exception thrown on your 
subscribe(partition) call, you need to check if 1) you called poll() in 
between so that partition is no longer assigned; 2) you called subscribe(topic) 
before and that partition is not one of the assigned partitions. I.e., you as 
the developer needs to check what consumer function calls it has done (the 
trace) before in order to trouble-shoot, when the code is not necessarily 
written by yourself. With pause / resume, it will be more clear that the 
consumer is certainly using topic subscriptions, and the partition is no longer 
assigned to you if those functions throw an exception because of a rebalance, 
etc.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another consumer. The desired behavior is instead that you keep the partition 
 assigned and simply 
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(TopicPartition... partitions);
 void resume(TopicPartition... partitions);
 {code}
 Here is the expected behavior of pause/resume:
 * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
 any new fetches for that partition.
 * After the partition is resumed, fetches will begin again. 
 * While a partition is paused, seek() and position() can still be used to 
 advance or query the current position.
 * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33620: Patch for KAFKA-1690

2015-07-27 Thread Dong Lin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review93177
---



clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (line 
108)
https://reviews.apache.org/r/33620/#comment147469

Do you mean return copy?


- Dong Lin


On July 25, 2015, 7:11 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated July 25, 2015, 7:11 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Merge commit
 
 
 KAFKA-1690. Added SSL Consumer Test.
 
 
 KAFKA-1690. SSL Support.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 KAFKA-1690. added staged receives to selector.
 
 
 KAFKA-1690. Addressing reviews.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 Diffs
 -
 
   build.gradle 0abec26fb2d7be62c8a673f9ec838e926e64b2d1 
   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 bae528d31516679bed88ee61b408f209f185a8cc 
   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
 PRE-CREATION 
   
 

Re: Review Request 36652: Patch for KAFKA-2351

2015-07-27 Thread Jiangjie Qin


 On July 24, 2015, 4:13 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 264
  https://reviews.apache.org/r/36652/diff/3/?file=1020607#file1020607line264
 
  Not sure if it's better to keep the thread alive on any throwable. For 
  unexpected exceptions, it seems it's better to just propagate the 
  exception, log it and then kill the thread. This is already done through 
  Utils.newThread. If we want to clean things up (e.g. countdown) before 
  propagating the exception, we can do that in a finally clause.

Hi Jun, I think only letting the acceptor thread die might cause more issue. 
The broker in that case will not serve new connections. This essentially makes 
all the partitions on this broker become unavailable. If we let the accpetor 
thread exit, maybe we should shutdown the broker completely.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36652/#review92920
---


On July 24, 2015, 4:36 a.m., Mayuresh Gharat wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36652/
 ---
 
 (Updated July 24, 2015, 4:36 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2351
 https://issues.apache.org/jira/browse/KAFKA-2351
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Added a try-catch to catch any exceptions thrown by the nioSelector
 
 
 Addressed comments on the Jira ticket
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/SocketServer.scala 
 91319fa010b140cca632e5fa8050509bd2295fc9 
 
 Diff: https://reviews.apache.org/r/36652/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Mayuresh Gharat
 




[jira] [Commented] (KAFKA-2364) Improve documentation for contributing to docs

2015-07-27 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643179#comment-14643179
 ] 

Ismael Juma commented on KAFKA-2364:


Coincidentally a CONTRIBUTING.md was added today and it links to 2 pages that 
have more information. Can you please take a look and see if it's better now?

The website is sadly still in SVN, but we would be happy to move it to Git as 
per https://blogs.apache.org/infra/entry/git_based_websites_available

Help would be welcome as we are currently all busy with other tasks.

 Improve documentation for contributing to docs
 --

 Key: KAFKA-2364
 URL: https://issues.apache.org/jira/browse/KAFKA-2364
 Project: Kafka
  Issue Type: Task
Reporter: Aseem Bansal
Priority: Minor
  Labels: doc

 While reading the documentation for kafka 8 I saw some improvements that can 
 be made. But the docs for contributing are not very good at 
 https://github.com/apache/kafka. It just gives me a URL for svn. But I am not 
 sure what to do. Can the README.MD file be improved for contributing to docs?
 I have submitted patches to groovy and grails by sending PRs via github but  
 looking at the comments on PRs submitted to kafak it seems PRs via github are 
 not working for kafka. It would be good to make that work also.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Partitioning in Kafka

2015-07-27 Thread Jason Gustafson
For a little background, the difference between this partitioner and the
default one is that it breaks the deterministic mapping from key to
partition. Instead, messages for a given key can end up in either of two
partitions. This means that the consumer generally won't see all messages
for a given key. Instead the consumer would compute an aggregate for each
key on the partitions it consumes and write them to a separate topic. For
example, if you are writing log messages to a logs topic with the
hostname as the key, you could this partitioning strategy to compute
message counts for each host in each partition and write them to a
log-counts topic. Then a consumer of the log-counts topic would compute
total aggregates based on the two intermediate aggregates. The benefit is
that you are generally going to get better load balancing across partitions
than if you used the default partitioner. (Please correct me if my
understanding is incorrect, Gianmarco)

So I think the question is whether this is a useful primitive for Kafka to
provide out of the box? I was a little concerned that this use case is a
little esoteric for a core feature, but it may make more sense in the
context of KIP-28 which would provide some higher-level processing
capabilities (though it doesn't seem like the KStream abstraction would
provide a direct way to leverage this partitioner without custom logic).

Thanks,
Jason


On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales 
g...@apache.org wrote:

 Hello folks,

 I'd like to ask the community about its opinion on the partitioning
 functions in Kafka.

 With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
 integrated we are now able to have custom partitioners in the producer.
 The question now becomes *which* partitioners should ship with Kafka?
 This issue arose in the context of KAFKA-2092
 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
 specific load-balanced partitioning. This partitioner however assumes some
 stages of processing on top of it to make proper use of the data, i.e., it
 envisions Kafka as a substrate for stream processing, and not only as the
 I/O component.
 Is this a direction that Kafka wants to go towards? Or is this a role
 better left to the internal communication systems of other stream
 processing engines (e.g., Storm)?
 And if the answer is the latter, how would something such a Samza (which
 relies mostly on Kafka as its communication substrate) be able to implement
 advanced partitioning schemes?

 Cheers,
 --
 Gianmarco



Re: Kafka Consumer thoughts

2015-07-27 Thread Kartik Paramasivam
adding the open source alias.  This email started off as a broader
discussion around the new consumer.  I was zooming into only the aspect of
poll() being the only mechanism for driving the heartbeats.

Yes the lag is the effect of the problem (not the problem).  Monitoring the
lag is important as it is the primary way to tell if the application is
wedged.  There might be other metrics which can possibly capture the same
essence. Yes the lag is at the consumer group level, but you can tell that
one of the consumers is messed up if one of the partitions in the
application start generating lag and others are good for e.g.

Monitoring aside, I think the main point of concern is that in the old
consumer most customers don't have to worry about unnecessary rebalances
and most of the things that they do in their app doesn't have an impact on
the session timeout..  (i.e. the only thing that causes rebalances is when
the GC is out of whack).For the handful of customers who are impacted
by GC related rebalances, i would imagine that all of them would really
want us to make the system more resilient.I agree that the GC problem
can't be solved easily in the java client, however it appears that now we
would be expecting the consuming applications to be even more careful with
ongoing tuning of the timeouts.  At LinkedIn, we have seen that most kafka
applications don't have much of a clue about configuring the timeouts and
just end up calling the Kafka team when their application sees rebalances.

The other side effect of poll driving the heartbeats is that we have to
make sure that people don't set a poll timeout that is larger than the
session timeout.   If we had a notion of implicit heartbeats then we could
also automatically make this work for consumers by sending hearbeats at the
appropriate interval even though the customers want to do a long poll.

We could surely work around this in LinkedIn if either we have the Pause()
api or an explicit HeartBeat() api on the consumer.

Would love to hear how other people think about this subject ?

Thanks
Kartik



On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede n...@confluent.io wrote:

 Agree with the dilemma you are pointing out, which is that there are many
 ways the application's message processing could fail and we wouldn't be
 able to model all of those in the consumer's failure detection mechanism.
 So we should try to model as much of it as we can so the consumer's failure
 detection is meaningful.


 Point being that the only absolute way to really detect that an app is
 healthy is to monitor lag. If the lag increases then for sure something is
 wrong.


 The lag is merely the effect of the problem, not the problem itself. Lag
 is also a consumer group level concept and the problem we have is being
 able to detect failures at the level of individual consumer instances.

 As you pointed out, a consumer that poll() is a stronger indicator of
 whether the consumer is alive or not. The dilemma then is who defines what
 a healthy poll() frequency is. No one else but the application owner can
 define what a normal processing latency is for their application. Now the
 question is what's the easiest way for the user to define this without
 having to tune and fine tune this too often. The heartbeat interval
 certainly does not have to be *exactly* 99tile of processing latency but
 could be in the ballpark + an error delta. The error delta is the
 application owner's acceptable risk threshold during which they would be ok
 if the application remains part of the group despite being dead. It is
 ultimately a tradeoff between operational ease and more accurate failure
 detection.

 With quotas the write latencies to kafka could range from a few
 milliseconds all the way to a tens of seconds.


 This is actually no different from the GC problem. Most most of the times,
 the normal GC falls in the few ms range and there are many applications
 even at LinkedIn for which the max GC falls in the multiple seconds range.
 Note that it also can't be predicted, so has to be an observed value. One
 way or the other, you have to observe what this acceptable max is for
 your application and then set the appropriate timeouts.

 Since this is not something that can be automated, this is a config that
 the application owner has to set based on the expected behavior of their
 application. Not wanting to do that leads to ending up with bad consumption
 semantics where the application process continues to be part of a group
 owning partitions but not consuming since it has halted due to a problem.
 The fact that the design requires them to express that in poll() frequency
 or not doesn't change the fact that the application owner has to go through
 the process of measuring and then defining this max.

 The reverse where they don't do this and the application remains in the
 group despite being dead is super confusing and frustrating too. So the due
 diligence up front is actually worth. 

[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643442#comment-14643442
 ] 

Jason Gustafson commented on KAFKA-2350:


[~becket_qin] I think that we're on the same page as far as supporting only 
automatic or manual assignment and not trying to mix them. I think my confusion 
is that subscribe(partition) in your proposal is used both a) to subscribe to a 
partition when manual assignment is used, and b) to unpause a partition when 
automatic assignment is used. This leads to the weird ordering problems that we 
have been talking about. By the way, I added the line about seek() and 
position() since it seems like something that intuitively should be supported 
by pause semantics. I think it's debatable whether it's really needed, but I 
think it would cause a bit a surprise to the user if we didn't allow it.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another consumer. The desired behavior is instead that you keep the partition 
 assigned and simply 
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(TopicPartition... partitions);
 void resume(TopicPartition... partitions);
 {code}
 Here is the expected behavior of pause/resume:
 * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
 any new fetches for that partition.
 * After the partition is resumed, fetches will begin again. 
 * While a partition is paused, seek() and position() can still be used to 
 advance or query the current position.
 * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Consumer thoughts

2015-07-27 Thread Jason Gustafson
I think if we recommend a longer session timeout, then we should expose the
heartbeat frequency in configuration since this generally controls how long
normal rebalances will take. I think it's currently hard-coded to 3
heartbeats per session timeout. It could also be nice to have an explicit
LeaveGroup request to implement clean shutdown of a consumer. Then the
coordinator doesn't have to wait for the timeout to reassign partitions.

-Jason

On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps j...@confluent.io wrote:

 Hey Kartik,

 Totally agree we don't want people tuning timeouts in the common case.

 However there are two ways to avoid this:
 1. Default the timeout high
 2. Put the heartbeat in a separate thread

 When we were doing the consumer design we discussed this tradeoff and I
 think the conclusion we came to was that defaulting to a high timeout was
 actually better. This means it takes a little longer to detect a failure,
 but usually that is not a big problem and people who want faster failure
 detection can tune it down. This seemed better than having the failure
 detection not really cover the consumption and just be a background ping.
 The two reasons where (a) you still have the GC problem even for the
 background thread, (b) consumption is in some sense a better definition of
 an active healthy consumer and a lot of problems crop up when you have an
 inactive consumer with an active background thread (as today).

 When we had the discussion I think what we realized was that most people
 who were worried about the timeout where imagining a very low default
 (500ms) say. But in fact just setting this to 60 seconds or higher as a
 default would be okay, this adds to the failure detection time but only
 apps that care about this need to tune. This should largely eliminate false
 positives since after all if you disappear for 60 seconds that actually
 starts to be more of a true positive, even if you come back... :-)

 -Jay



 On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam 
 kparamasi...@linkedin.com wrote:

 adding the open source alias.  This email started off as a broader
 discussion around the new consumer.  I was zooming into only the aspect of
 poll() being the only mechanism for driving the heartbeats.

 Yes the lag is the effect of the problem (not the problem).  Monitoring
 the lag is important as it is the primary way to tell if the application is
 wedged.  There might be other metrics which can possibly capture the same
 essence. Yes the lag is at the consumer group level, but you can tell that
 one of the consumers is messed up if one of the partitions in the
 application start generating lag and others are good for e.g.

 Monitoring aside, I think the main point of concern is that in the old
 consumer most customers don't have to worry about unnecessary rebalances
 and most of the things that they do in their app doesn't have an impact on
 the session timeout..  (i.e. the only thing that causes rebalances is when
 the GC is out of whack).For the handful of customers who are impacted
 by GC related rebalances, i would imagine that all of them would really
 want us to make the system more resilient.I agree that the GC problem
 can't be solved easily in the java client, however it appears that now we
 would be expecting the consuming applications to be even more careful with
 ongoing tuning of the timeouts.  At LinkedIn, we have seen that most kafka
 applications don't have much of a clue about configuring the timeouts and
 just end up calling the Kafka team when their application sees rebalances.

 The other side effect of poll driving the heartbeats is that we have to
 make sure that people don't set a poll timeout that is larger than the
 session timeout.   If we had a notion of implicit heartbeats then we could
 also automatically make this work for consumers by sending hearbeats at the
 appropriate interval even though the customers want to do a long poll.

 We could surely work around this in LinkedIn if either we have the
 Pause() api or an explicit HeartBeat() api on the consumer.

 Would love to hear how other people think about this subject ?

 Thanks
 Kartik



 On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede n...@confluent.io wrote:

 Agree with the dilemma you are pointing out, which is that there are
 many ways the application's message processing could fail and we wouldn't
 be able to model all of those in the consumer's failure detection
 mechanism. So we should try to model as much of it as we can so the
 consumer's failure detection is meaningful.


 Point being that the only absolute way to really detect that an app is
 healthy is to monitor lag. If the lag increases then for sure something is
 wrong.


 The lag is merely the effect of the problem, not the problem itself. Lag
 is also a consumer group level concept and the problem we have is being
 able to detect failures at the level of individual consumer instances.

 As you pointed out, a consumer that 

Review Request 36858: Patch for KAFKA-2120

2015-07-27 Thread Mayuresh Gharat

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/
---

Review request for kafka.


Bugs: KAFKA-2120
https://issues.apache.org/jira/browse/KAFKA-2120


Repository: kafka


Description
---

Addressed Joel', Adi's, Dong's, Becket's comments


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
  clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
7ab2503794ff3aab39df881bd9fbae6547827d3b 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
48fe7961e2215372d8033ece4af739ea06c6457b 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
70377ae2fa46deb381139d28590ce6d4115e1adc 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
bea3d737c51be77d5b5293cdd944d33b905422ba 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
aa264202f2724907924985a5ecbe74afc4c6c04b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
 4cb1e50d6c4ed55241aeaef1d3af09def5274103 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
0baf16e55046a2f49f6431e01d52c323c95eddf0 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
aaf60c98c2c0f4513a8d65ee0db67953a529d598 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
d9c97e966c0e2fb605b67285f4275abb89f8813e 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
43238ceaad0322e39802b615bb805b895336a009 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 5b2e4ffaeab7127648db608c179703b27b577414 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 

Diff: https://reviews.apache.org/r/36858/diff/


Testing
---


Thanks,

Mayuresh Gharat



[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient

2015-07-27 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643399#comment-14643399
 ] 

Mayuresh Gharat commented on KAFKA-2120:


Created reviewboard https://reviews.apache.org/r/36858/diff/
 against branch origin/trunk

 Add a request timeout to NetworkClient
 --

 Key: KAFKA-2120
 URL: https://issues.apache.org/jira/browse/KAFKA-2120
 Project: Kafka
  Issue Type: New Feature
Reporter: Jiangjie Qin
Assignee: Mayuresh Gharat
 Attachments: KAFKA-2120.patch


 Currently NetworkClient does not have a timeout setting for requests. So if 
 no response is received for a request due to reasons such as broker is down, 
 the request will never be completed.
 Request timeout will also be used as implicit timeout for some methods such 
 as KafkaProducer.flush() and kafkaProducer.close().
 KIP-19 is created for this public interface change.
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2120) Add a request timeout to NetworkClient

2015-07-27 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-2120:
---
Status: Patch Available  (was: Open)

 Add a request timeout to NetworkClient
 --

 Key: KAFKA-2120
 URL: https://issues.apache.org/jira/browse/KAFKA-2120
 Project: Kafka
  Issue Type: New Feature
Reporter: Jiangjie Qin
Assignee: Mayuresh Gharat
 Attachments: KAFKA-2120.patch


 Currently NetworkClient does not have a timeout setting for requests. So if 
 no response is received for a request due to reasons such as broker is down, 
 the request will never be completed.
 Request timeout will also be used as implicit timeout for some methods such 
 as KafkaProducer.flush() and kafkaProducer.close().
 KIP-19 is created for this public interface change.
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2120) Add a request timeout to NetworkClient

2015-07-27 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-2120:
---
Attachment: KAFKA-2120.patch

 Add a request timeout to NetworkClient
 --

 Key: KAFKA-2120
 URL: https://issues.apache.org/jira/browse/KAFKA-2120
 Project: Kafka
  Issue Type: New Feature
Reporter: Jiangjie Qin
Assignee: Mayuresh Gharat
 Attachments: KAFKA-2120.patch


 Currently NetworkClient does not have a timeout setting for requests. So if 
 no response is received for a request due to reasons such as broker is down, 
 the request will never be completed.
 Request timeout will also be used as implicit timeout for some methods such 
 as KafkaProducer.flush() and kafkaProducer.close().
 KIP-19 is created for this public interface change.
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Consumer thoughts

2015-07-27 Thread Jay Kreps
Hey Kartik,

Totally agree we don't want people tuning timeouts in the common case.

However there are two ways to avoid this:
1. Default the timeout high
2. Put the heartbeat in a separate thread

When we were doing the consumer design we discussed this tradeoff and I
think the conclusion we came to was that defaulting to a high timeout was
actually better. This means it takes a little longer to detect a failure,
but usually that is not a big problem and people who want faster failure
detection can tune it down. This seemed better than having the failure
detection not really cover the consumption and just be a background ping.
The two reasons where (a) you still have the GC problem even for the
background thread, (b) consumption is in some sense a better definition of
an active healthy consumer and a lot of problems crop up when you have an
inactive consumer with an active background thread (as today).

When we had the discussion I think what we realized was that most people
who were worried about the timeout where imagining a very low default
(500ms) say. But in fact just setting this to 60 seconds or higher as a
default would be okay, this adds to the failure detection time but only
apps that care about this need to tune. This should largely eliminate false
positives since after all if you disappear for 60 seconds that actually
starts to be more of a true positive, even if you come back... :-)

-Jay



On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam 
kparamasi...@linkedin.com wrote:

 adding the open source alias.  This email started off as a broader
 discussion around the new consumer.  I was zooming into only the aspect of
 poll() being the only mechanism for driving the heartbeats.

 Yes the lag is the effect of the problem (not the problem).  Monitoring
 the lag is important as it is the primary way to tell if the application is
 wedged.  There might be other metrics which can possibly capture the same
 essence. Yes the lag is at the consumer group level, but you can tell that
 one of the consumers is messed up if one of the partitions in the
 application start generating lag and others are good for e.g.

 Monitoring aside, I think the main point of concern is that in the old
 consumer most customers don't have to worry about unnecessary rebalances
 and most of the things that they do in their app doesn't have an impact on
 the session timeout..  (i.e. the only thing that causes rebalances is when
 the GC is out of whack).For the handful of customers who are impacted
 by GC related rebalances, i would imagine that all of them would really
 want us to make the system more resilient.I agree that the GC problem
 can't be solved easily in the java client, however it appears that now we
 would be expecting the consuming applications to be even more careful with
 ongoing tuning of the timeouts.  At LinkedIn, we have seen that most kafka
 applications don't have much of a clue about configuring the timeouts and
 just end up calling the Kafka team when their application sees rebalances.

 The other side effect of poll driving the heartbeats is that we have to
 make sure that people don't set a poll timeout that is larger than the
 session timeout.   If we had a notion of implicit heartbeats then we could
 also automatically make this work for consumers by sending hearbeats at the
 appropriate interval even though the customers want to do a long poll.

 We could surely work around this in LinkedIn if either we have the Pause()
 api or an explicit HeartBeat() api on the consumer.

 Would love to hear how other people think about this subject ?

 Thanks
 Kartik



 On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede n...@confluent.io wrote:

 Agree with the dilemma you are pointing out, which is that there are many
 ways the application's message processing could fail and we wouldn't be
 able to model all of those in the consumer's failure detection mechanism.
 So we should try to model as much of it as we can so the consumer's failure
 detection is meaningful.


 Point being that the only absolute way to really detect that an app is
 healthy is to monitor lag. If the lag increases then for sure something is
 wrong.


 The lag is merely the effect of the problem, not the problem itself. Lag
 is also a consumer group level concept and the problem we have is being
 able to detect failures at the level of individual consumer instances.

 As you pointed out, a consumer that poll() is a stronger indicator of
 whether the consumer is alive or not. The dilemma then is who defines what
 a healthy poll() frequency is. No one else but the application owner can
 define what a normal processing latency is for their application. Now the
 question is what's the easiest way for the user to define this without
 having to tune and fine tune this too often. The heartbeat interval
 certainly does not have to be *exactly* 99tile of processing latency but
 could be in the ballpark + an error delta. The error delta is the
 

Re: [DISCUSS] Partitioning in Kafka

2015-07-27 Thread Ewen Cheslack-Postava
Gwen - this is really like two steps of map reduce though, right? The first
step does the partial shuffle to two partitions per key, second step does
partial reduce + final full shuffle, final step does the final reduce.

This strikes me as similar to partition assignment strategies in the
consumer in that there will probably be a small handful of commonly used
strategies that we can just maintain as part of Kafka. A few people will
need more obscure strategies and they can maintain those implementations
themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash
and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner
for partitioning matrices, and ShuffleRowRDD for their SQL implementation.
So I don't think it would be a big deal to include it here, although I'm
not really sure how often it's useful -- compared to normal partitioning or
just doing two steps by starting with unpartitioned data, you need to be
performing an aggregation, the key set needs to be large enough for memory
usage to be a problem (i.e. you don't want each consumer to have to
maintain a map with every key in it), and a sufficiently skewed
distribution (i.e. not just 1 or 2 very hot keys). The key set constraint,
in particular, is the one I'm not convinced by since in practice if you
have a skewed distribution, you probably also won't actually see every key
in every partition; each worker actually only needs to maintain a subset of
the key set (and associated aggregate data) in memory.


On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com
wrote:

 If you are used to map-reduce patterns, this sounds like a perfectly
 natural way to process streams of data.

 Call the first consumer map-combine-log, the topic shuffle-log and
 the second consumer reduce-log :)
 I like that a lot. It works well for either embarrassingly parallel
 cases, or so much data that more parallelism is worth the extra
 overhead cases.

 I personally don't care if its in core-Kafka, KIP-28 or a github
 project elsewhere, but I find it useful and non-esoteric.



 On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io
 wrote:
  For a little background, the difference between this partitioner and the
  default one is that it breaks the deterministic mapping from key to
  partition. Instead, messages for a given key can end up in either of two
  partitions. This means that the consumer generally won't see all messages
  for a given key. Instead the consumer would compute an aggregate for each
  key on the partitions it consumes and write them to a separate topic. For
  example, if you are writing log messages to a logs topic with the
  hostname as the key, you could this partitioning strategy to compute
  message counts for each host in each partition and write them to a
  log-counts topic. Then a consumer of the log-counts topic would
 compute
  total aggregates based on the two intermediate aggregates. The benefit is
  that you are generally going to get better load balancing across
 partitions
  than if you used the default partitioner. (Please correct me if my
  understanding is incorrect, Gianmarco)
 
  So I think the question is whether this is a useful primitive for Kafka
 to
  provide out of the box? I was a little concerned that this use case is a
  little esoteric for a core feature, but it may make more sense in the
  context of KIP-28 which would provide some higher-level processing
  capabilities (though it doesn't seem like the KStream abstraction would
  provide a direct way to leverage this partitioner without custom logic).
 
  Thanks,
  Jason
 
 
  On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales 
  g...@apache.org wrote:
 
  Hello folks,
 
  I'd like to ask the community about its opinion on the partitioning
  functions in Kafka.
 
  With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
  integrated we are now able to have custom partitioners in the producer.
  The question now becomes *which* partitioners should ship with Kafka?
  This issue arose in the context of KAFKA-2092
  https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
  specific load-balanced partitioning. This partitioner however assumes
 some
  stages of processing on top of it to make proper use of the data, i.e.,
 it
  envisions Kafka as a substrate for stream processing, and not only as
 the
  I/O component.
  Is this a direction that Kafka wants to go towards? Or is this a role
  better left to the internal communication systems of other stream
  processing engines (e.g., Storm)?
  And if the answer is the latter, how would something such a Samza (which
  relies mostly on Kafka as its communication substrate) be able to
 implement
  advanced partitioning schemes?
 
  Cheers,
  --
  Gianmarco
 




-- 
Thanks,
Ewen


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643349#comment-14643349
 ] 

Jason Gustafson commented on KAFKA-2350:


There's one interesting implementation note that [~yasuhiro.matsuda] and 
[~guozhang] brought up. When a partition is unpaused, there may be an active 
fetch which is parked on the broker. In the current implementation, the 
consumer will not initiate any new fetches until that fetch has completed. This 
means that the consumer will not be able to immediately process messages from 
the unpaused partition even if has some available. There are a couple ways this 
could be handled. We could issue the new fetch from a different socket on the 
client. We could also implement a way to cancel or override the active fetch on 
the broker. Since both of these make this patch significantly more complex, I 
think we should just note this limitation in the documentation and address it 
later if it becomes a larger problem.



 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another consumer. The desired behavior is instead that you keep the partition 
 assigned and simply 
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(TopicPartition... partitions);
 void resume(TopicPartition... partitions);
 {code}
 Here is the expected behavior of pause/resume:
 * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
 any new fetches for that partition.
 * After the partition is resumed, fetches will begin again. 
 * While a partition is paused, seek() and position() can still be used to 
 advance or query the current position.
 * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36858: Patch for KAFKA-2120

2015-07-27 Thread Mayuresh Gharat

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/
---

(Updated July 27, 2015, 9:09 p.m.)


Review request for kafka.


Bugs: KAFKA-2120
https://issues.apache.org/jira/browse/KAFKA-2120


Repository: kafka


Description (updated)
---

Kip-19 : Added RequestTimeout and MaxBlockTimeout as per the KIP.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
  clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
7ab2503794ff3aab39df881bd9fbae6547827d3b 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
48fe7961e2215372d8033ece4af739ea06c6457b 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
70377ae2fa46deb381139d28590ce6d4115e1adc 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
bea3d737c51be77d5b5293cdd944d33b905422ba 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
aa264202f2724907924985a5ecbe74afc4c6c04b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
 4cb1e50d6c4ed55241aeaef1d3af09def5274103 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
0baf16e55046a2f49f6431e01d52c323c95eddf0 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
aaf60c98c2c0f4513a8d65ee0db67953a529d598 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
d9c97e966c0e2fb605b67285f4275abb89f8813e 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
43238ceaad0322e39802b615bb805b895336a009 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 5b2e4ffaeab7127648db608c179703b27b577414 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 

Diff: https://reviews.apache.org/r/36858/diff/


Testing
---


Thanks,

Mayuresh Gharat



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643418#comment-14643418
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~hachikuji], I am with [~guozhang] that it is much clear if we only support 
either auto partition assignment or manual partition assignment, but not mixed 
mode. I assumed pause/unpause will only be used when auto partition assignment 
is used, so we can check the assigned partition set. If it is under manual 
partition assignment, for the seek(), I assume that you will start consume 
after seek? If so, it might be the same as:
{code}
subscribe(tp)
seek(tp, offset)
poll()
{code}

[~guozhang], I see your point. I am convinced that for people who are using 
auto partition assignment, pause/unpause is more intuitive than using partition 
level sub/unsub. I am not really oppose to having them. What I was worrying is 
that we are adding methods that are intuitive to some particular use case, but 
potentially open the door for adding APIs that only have subtle differences. If 
we take a closer look at our API, there are potentially some other cases we can 
argue for new API. e.g. user might want to have auto commit turn on only for 
some but not all of the partitions they subscribed to. User might want to find 
a list of offsets of a partition within a time range. These are all different 
use cases, but likely can be solved with some lower level API calls instead of 
having a dedicate intuitive API for each of them.

I kind of feel the dilemma we are facing now is that in new consumer, we try to 
address both the high level consumer and low level consumer use cases. 
pause/unpause looks to me a medium-to-low level use case. As the higher level 
requirements can vary a lot and have subtle difference from one to another, the 
question to be answered is that should we expose a high level interface for 
each of the high level use case? Or should we just ask user to use a lower 
level API as long as we support the functionality.

My understanding is that for high level consumer use cases, hopefully we don't 
need user to care too much about the underlying mechanism. For people who wants 
to deal with lower level concept such as offsets, partition assignment, 
temporary consumption suspension, instead of having on high level API written 
for each of the use cases, letting user use a lower level API makes sense to me.

In terms of the example you mentioned, can we solve them by throwing 
appropriate exceptions?

{code}
// Auto partition assignment
subscribe(topic1) // assuming only topic1-partition0 is assigned to this 
consumer.
subscribe(topic1-partition1) // throw IllegalStateException(Cannot subscribe to 
topic1-partition1 because topic1 is managed by consumer coordinator)
unsubscribe(topic1-partition1) // throw IllegalStateException(Topic1-partiion1 
is managed by consumer coordinator, and topic1-partition1 is not assigned to 
this consumer.)
{code}

{code}
subscribe(topic1-partition0)
subscribe(topic1) // throw IllegalStateException(Cannot subscribe to topic1 
because the assignment of topic1 is manually managed)
unsubscribe(topic1-partition1) // throw IllegalStateException(Cannot 
unsubscribe topic1-partition1 because it is not subscribed)
{code}

[~nehanarkhede], what you said makes a lot of sense. I guess I'm just looking 
at the problem from a different angle - user either wants to consume from a 
topic or not at a certain point, whether temporarily or permanently. So the 
state I see is only CONSUME/NOT_CONSUME. The PAUSE state for consumer would be 
pretty much the same as NOT_CONSUME. I might have missed some use case like 
[~hachikuji] mentioned - seek() on a PAUSE partition, but that can be solved by 
calling subscribe() first.

[~gwenshap], good point about heartbeat. We actually got some feedback from 
users in LinkedIn and found that putting the responsibility of sending 
heartbeat on user might be a problem in the first place... We may have 
pause/unpause as a workaround, but the ultimate issue is that maybe we are 
asking too much from user to maintain the heartbeat...

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another 

Re: Kafka Consumer thoughts

2015-07-27 Thread Ewen Cheslack-Postava
Kartik, on your second point about timeouts with poll() and heartbeats, the
consumer now handles this properly. KAFKA-2123 introduced a
DelayedTaskQueue and that is used internally to handle processing events at
the right time even if poll() is called with a large timeout. The same
mechanism is used to handle auto commit, which should also occur in a
timely fashion even if poll() is called with a large timeout.



On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps j...@confluent.io wrote:

 Hey Kartik,

 Totally agree we don't want people tuning timeouts in the common case.

 However there are two ways to avoid this:
 1. Default the timeout high
 2. Put the heartbeat in a separate thread

 When we were doing the consumer design we discussed this tradeoff and I
 think the conclusion we came to was that defaulting to a high timeout was
 actually better. This means it takes a little longer to detect a failure,
 but usually that is not a big problem and people who want faster failure
 detection can tune it down. This seemed better than having the failure
 detection not really cover the consumption and just be a background ping.
 The two reasons where (a) you still have the GC problem even for the
 background thread, (b) consumption is in some sense a better definition of
 an active healthy consumer and a lot of problems crop up when you have an
 inactive consumer with an active background thread (as today).

 When we had the discussion I think what we realized was that most people
 who were worried about the timeout where imagining a very low default
 (500ms) say. But in fact just setting this to 60 seconds or higher as a
 default would be okay, this adds to the failure detection time but only
 apps that care about this need to tune. This should largely eliminate false
 positives since after all if you disappear for 60 seconds that actually
 starts to be more of a true positive, even if you come back... :-)

 -Jay



 On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam 
 kparamasi...@linkedin.com wrote:

 adding the open source alias.  This email started off as a broader
 discussion around the new consumer.  I was zooming into only the aspect of
 poll() being the only mechanism for driving the heartbeats.

 Yes the lag is the effect of the problem (not the problem).  Monitoring
 the lag is important as it is the primary way to tell if the application is
 wedged.  There might be other metrics which can possibly capture the same
 essence. Yes the lag is at the consumer group level, but you can tell that
 one of the consumers is messed up if one of the partitions in the
 application start generating lag and others are good for e.g.

 Monitoring aside, I think the main point of concern is that in the old
 consumer most customers don't have to worry about unnecessary rebalances
 and most of the things that they do in their app doesn't have an impact on
 the session timeout..  (i.e. the only thing that causes rebalances is when
 the GC is out of whack).For the handful of customers who are impacted
 by GC related rebalances, i would imagine that all of them would really
 want us to make the system more resilient.I agree that the GC problem
 can't be solved easily in the java client, however it appears that now we
 would be expecting the consuming applications to be even more careful with
 ongoing tuning of the timeouts.  At LinkedIn, we have seen that most kafka
 applications don't have much of a clue about configuring the timeouts and
 just end up calling the Kafka team when their application sees rebalances.

 The other side effect of poll driving the heartbeats is that we have to
 make sure that people don't set a poll timeout that is larger than the
 session timeout.   If we had a notion of implicit heartbeats then we could
 also automatically make this work for consumers by sending hearbeats at the
 appropriate interval even though the customers want to do a long poll.

 We could surely work around this in LinkedIn if either we have the
 Pause() api or an explicit HeartBeat() api on the consumer.

 Would love to hear how other people think about this subject ?

 Thanks
 Kartik



 On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede n...@confluent.io wrote:

 Agree with the dilemma you are pointing out, which is that there are
 many ways the application's message processing could fail and we wouldn't
 be able to model all of those in the consumer's failure detection
 mechanism. So we should try to model as much of it as we can so the
 consumer's failure detection is meaningful.


 Point being that the only absolute way to really detect that an app is
 healthy is to monitor lag. If the lag increases then for sure something is
 wrong.


 The lag is merely the effect of the problem, not the problem itself. Lag
 is also a consumer group level concept and the problem we have is being
 able to detect failures at the level of individual consumer instances.

 As you pointed out, a consumer that poll() is a stronger indicator 

[jira] [Comment Edited] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643418#comment-14643418
 ] 

Jiangjie Qin edited comment on KAFKA-2350 at 7/27/15 9:45 PM:
--

[~hachikuji], I am with [~guozhang] that it is much clear if we only support 
either auto partition assignment or manual partition assignment, but not mixed 
mode. I assumed pause/unpause will only be used when auto partition assignment 
is used, so we can check the assigned partition set. If it is under manual 
partition assignment, for the seek(), I assume that you will start consume 
after seek? If so, it might be the same as:
{code}
subscribe(tp)
seek(tp, offset)
poll()
{code}

[~guozhang], I see your point. I am convinced that for people who are using 
auto partition assignment, pause/unpause is more intuitive than using partition 
level sub/unsub. I am not really oppose to having them. What I was worrying is 
that we are adding methods that are intuitive to some particular use case, but 
potentially open the door for adding APIs that only have subtle differences. If 
we take a closer look at our API, there are potentially some other cases we can 
argue for new API. e.g. user might want to have auto commit turn on only for 
some but not all of the partitions they subscribed to. User might want to find 
a list of offsets of a partition within a time range. These are all different 
use cases, but likely can be solved with some lower level API calls instead of 
having a dedicate intuitive API for each of them.

I kind of feel the dilemma we are facing now is that in new consumer, we try to 
address both the high level consumer and low level consumer use cases. 
pause/unpause looks to me a medium-to-low level use case. As the higher level 
requirements can vary a lot and have subtle difference from one to another, the 
question to be answered is that should we expose a high level interface for 
each of the high level use case? Or should we just ask user to use a lower 
level API as long as we support the functionality.

My understanding is that for high level consumer use cases, hopefully we don't 
need user to care too much about the underlying mechanism. For people who wants 
to deal with lower level concept such as offsets, partition assignment, 
temporary consumption suspension, instead of having on high level API written 
for each of the use cases, letting user use a lower level API makes sense to me.

In terms of the example you mentioned, can we solve them by throwing 
appropriate exceptions?

{code}
// Auto partition assignment
subscribe(topic1) // assuming only topic1-partition0 is assigned to this 
consumer.
subscribe(topic1-partition1) // throw IllegalStateException(Cannot subscribe to 
topic1-partition1 because topic1 is managed by consumer coordinator)
unsubscribe(topic1-partition1) // throw IllegalStateException(Topic1-partiion1 
is managed by consumer coordinator, and topic1-partition1 is not assigned to 
this consumer.)
{code}

{code}
// Manual partition assignment
subscribe(topic1-partition0)
subscribe(topic1) // throw IllegalStateException(Cannot subscribe to topic1 
because the assignment of topic1 is manually managed)
unsubscribe(topic1-partition1) // throw IllegalStateException(Cannot 
unsubscribe topic1-partition1 because it is not subscribed)
{code}

[~nehanarkhede], what you said makes a lot of sense. I guess I'm just looking 
at the problem from a different angle - user either wants to consume from a 
topic or not at a certain point, whether temporarily or permanently. So the 
state I see is only CONSUME/NOT_CONSUME. The PAUSE state for consumer would be 
pretty much the same as NOT_CONSUME. I might have missed some use case like 
[~hachikuji] mentioned - seek() on a PAUSE partition, but that can be solved by 
calling subscribe() first.

[~gwenshap], good point about heartbeat. We actually got some feedback from 
users in LinkedIn and found that putting the responsibility of sending 
heartbeat on user might be a problem in the first place... We may have 
pause/unpause as a workaround, but the ultimate issue is that maybe we are 
asking too much from user to maintain the heartbeat...


was (Author: becket_qin):
[~hachikuji], I am with [~guozhang] that it is much clear if we only support 
either auto partition assignment or manual partition assignment, but not mixed 
mode. I assumed pause/unpause will only be used when auto partition assignment 
is used, so we can check the assigned partition set. If it is under manual 
partition assignment, for the seek(), I assume that you will start consume 
after seek? If so, it might be the same as:
{code}
subscribe(tp)
seek(tp, offset)
poll()
{code}

[~guozhang], I see your point. I am convinced that for people who are using 
auto partition assignment, pause/unpause is more intuitive than using partition 
level sub/unsub. I am not really oppose to 

Re: error while high level consumer

2015-07-27 Thread Jiangjie Qin
This is due to the zookeeper path storing the previous owner info hasn't
been deleted at the moment. If the rebalance completes after retry, it
should be fine.

Jiangjie (Becket) Qin

On Fri, Jul 24, 2015 at 6:54 PM, Kris K squareksc...@gmail.com wrote:

 Hi,

 I started seeing these errors in the logs continuously when I try to bring
 the High Level Consumer up. Please help.


 ZookeeperConsumerConnector [INFO] [XXX], waiting for the partition
 ownership to be deleted: 1

 ZookeeperConsumerConnector [INFO] [XXX], end rebalancing
 consumer XXX try #0

 ZookeeperConsumerConnector [INFO] [XXX], Rebalancing attempt failed.
 Clearing the cache before the next rebalancing operation is triggered


 Thanks,

 Kris



[jira] [Commented] (KAFKA-2268) New producer logs WARN if serializer supplied directly to constructor

2015-07-27 Thread Xuan Gong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643573#comment-14643573
 ] 

Xuan Gong commented on KAFKA-2268:
--

Looks like this is duplicate with 
https://issues.apache.org/jira/browse/KAFKA-2289

 New producer logs WARN if serializer supplied directly to constructor
 -

 Key: KAFKA-2268
 URL: https://issues.apache.org/jira/browse/KAFKA-2268
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.1
Reporter: Morten Lied Johansen
Assignee: Jun Rao
Priority: Trivial

 When creating a new KafkaProducer by passing in a configuration-map, a key 
 serializer and a value serializer, two warnings are logged:
 {noformat}
 [WARN ] [2015-06-15T14:26:36,271] The configuration value.serializer = class 
 no.finntech.commons.kafka.ThriftSerializer was supplied but isn't a known 
 config. 
 [org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:121)]
  [RMI TCP Connection(2)-127.0.0.1]
 [WARN ] [2015-06-15T14:26:36,271] The configuration key.serializer = class 
 org.apache.kafka.common.serialization.StringSerializer was supplied but isn't 
 a known config. 
 [org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:121)]
  [RMI TCP Connection(2)-127.0.0.1]
 {noformat}
 This happens because this constructor adds the serializers to the config 
 (KafkaProducer.java:108), and then ignores that configuration since the 
 instances are already present (KafkaProducer.java:215). Finally, it checks 
 which parts of the configuration were used, and logs warnings for the 
 remaining keys (KafkaProducer.java:230).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2130) Resource leakage in AppInfo.scala during initialization

2015-07-27 Thread Xuan Gong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643625#comment-14643625
 ] 

Xuan Gong commented on KAFKA-2130:
--

move 
{code}
stream.close();
{code}
to the finally block ?


 Resource leakage in AppInfo.scala during initialization
 ---

 Key: KAFKA-2130
 URL: https://issues.apache.org/jira/browse/KAFKA-2130
 Project: Kafka
  Issue Type: Bug
Reporter: Sebastien Zimmer
Priority: Trivial
  Labels: patch
 Attachments: patch.diff


 Minor InputStream leakage during the server initialization in AppInfo.scala. 
 Patch attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client

2015-07-27 Thread Sourabh Chandak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643532#comment-14643532
 ] 

Sourabh Chandak commented on KAFKA-1690:


[~sriharsha] Will this patch unblock the entire SSL authentication issue?

 new java producer needs ssl support as a client
 ---

 Key: KAFKA-1690
 URL: https://issues.apache.org/jira/browse/KAFKA-1690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.3

 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, 
 KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, 
 KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, 
 KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, 
 KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, 
 KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, 
 KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36858: Patch for KAFKA-2120

2015-07-27 Thread Mayuresh Gharat

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/
---

(Updated July 27, 2015, 10:31 p.m.)


Review request for kafka.


Bugs: KAFKA-2120
https://issues.apache.org/jira/browse/KAFKA-2120


Repository: kafka


Description (updated)
---

Solved compile error


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
  clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
7ab2503794ff3aab39df881bd9fbae6547827d3b 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
48fe7961e2215372d8033ece4af739ea06c6457b 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
70377ae2fa46deb381139d28590ce6d4115e1adc 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
bea3d737c51be77d5b5293cdd944d33b905422ba 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
aa264202f2724907924985a5ecbe74afc4c6c04b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
 4cb1e50d6c4ed55241aeaef1d3af09def5274103 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
0baf16e55046a2f49f6431e01d52c323c95eddf0 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
aaf60c98c2c0f4513a8d65ee0db67953a529d598 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
d9c97e966c0e2fb605b67285f4275abb89f8813e 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
43238ceaad0322e39802b615bb805b895336a009 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 5b2e4ffaeab7127648db608c179703b27b577414 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 

Diff: https://reviews.apache.org/r/36858/diff/


Testing
---


Thanks,

Mayuresh Gharat



Re: Review Request 36858: Patch for KAFKA-2120

2015-07-27 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/#review93189
---


Looks pretty good overall. Found mostly trivial stuff.


clients/src/main/java/org/apache/kafka/clients/ClientRequest.java (line 26)
https://reviews.apache.org/r/36858/#comment147486

Should ClientResponse.requestLatencyMs be updated to use sendMs instead of 
createdMs? Or perhaps a new method like wireLatencyMs() can record responseMs - 
sendMs? Both seem like useful metrics.



clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java (line 48)
https://reviews.apache.org/r/36858/#comment147491

It seems like it might be more natural to set this in NetworkClient.send.



clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java (line 142)
https://reviews.apache.org/r/36858/#comment147492

Probably better practice to use !requests.isEmpty().



clients/src/main/java/org/apache/kafka/clients/KafkaClient.java (line 72)
https://reviews.apache.org/r/36858/#comment147493

Missing @param for now.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 538)
https://reviews.apache.org/r/36858/#comment147497

You don't necessarily need to fix this in this patch, but this setting 
could conflict with FETCH_MAX_WAIT_MS_CONFIG.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 86)
https://reviews.apache.org/r/36858/#comment147510

Would it make sense to try to consolidate clientDisconnects and failedSends 
into the same collection? Unfortunately I can't think of a good name. Maybe 
nextDisconnects?



clients/src/test/java/org/apache/kafka/clients/MockClient.java (line 94)
https://reviews.apache.org/r/36858/#comment147498

Do we need to set sendMs in this method for consistency?


- Jason Gustafson


On July 27, 2015, 10:32 p.m., Mayuresh Gharat wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36858/
 ---
 
 (Updated July 27, 2015, 10:32 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2120
 https://issues.apache.org/jira/browse/KAFKA-2120
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Kip-19 :  Added RequestTimeout and MaxBlockTimeout as per the Kip
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
 ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 7ab2503794ff3aab39df881bd9fbae6547827d3b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 70377ae2fa46deb381139d28590ce6d4115e1adc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 bea3d737c51be77d5b5293cdd944d33b905422ba 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
  4cb1e50d6c4ed55241aeaef1d3af09def5274103 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
  06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 0baf16e55046a2f49f6431e01d52c323c95eddf0 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 aaf60c98c2c0f4513a8d65ee0db67953a529d598 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 d9c97e966c0e2fb605b67285f4275abb89f8813e 
   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
 43238ceaad0322e39802b615bb805b895336a009 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
  2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  5b2e4ffaeab7127648db608c179703b27b577414 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
 
 

[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client

2015-07-27 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643597#comment-14643597
 ] 

Sriharsha Chintalapani commented on KAFKA-1690:
---

[~sourabh0612] Yes. It includes broker , producer (new api) , consumer (new 
api) changes.

 new java producer needs ssl support as a client
 ---

 Key: KAFKA-1690
 URL: https://issues.apache.org/jira/browse/KAFKA-1690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.3

 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, 
 KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, 
 KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, 
 KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, 
 KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, 
 KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, 
 KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-27 Thread Jiangjie Qin
@Ewen, good point about batching. Yes, it would be tricky if we want to do
a per-key conditional produce. My understanding is that the prerequisite of
this KIP is:
1. Single producer for each partition.
2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER

The major problem it tries to solve is exact once produce, i.e. solve the
duplicates from producer side. In that case, a batch will be considered as
atomic. The only possibility of a batch got rejected should be it is
already appended. So the producer should just move on.

It looks to me even a transient multiple producer scenario will cause issue
because user need to think about what should do if a request got rejected
and the answer varies for different use cases.

Thanks,

Jiangjie (Becket) Qin

On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:

 So I had another look at the 'Idempotent Producer' proposal this
 afternoon, and made a few notes on how I think they compare; if I've
 made any mistakes, I'd be delighted if someone with more context on
 the idempotent producer design would correct me.

 As a first intuition, you can think of the 'conditional publish'
 proposal as the special case of the 'idempotent producer' idea, where
 there's just a single producer per-partition. The key observation here
 is: if there's only one producer, you can conflate the 'sequence
 number' and the expected offset. The conditional publish proposal uses
 existing Kafka offset APIs for roughly the same things as the
 idempotent producer proposal uses sequence numbers for -- eg. instead
 of having a lease PID API that returns the current sequence number,
 we can use the existing 'offset API' to retrieve the upcoming offset.

 Both proposals attempt to deal with the situation where there are
 transiently multiple publishers for the same partition (and PID). The
 idempotent producer setup tracks a generation id for each pid, and
 discards any writes with a generation id smaller than the latest
 value. Conditional publish is 'first write wins' -- and instead of
 dropping duplicates on the server, it returns an error to the client.
 The duplicate-handling behaviour (dropping vs. erroring) has some
 interesting consequences:

 - If all producers are producing the same stream of messages, silently
 dropping duplicates on the server is more convenient. (Suppose we have
 a batch of messages 0-9, and the high-water mark on the server is 7.
 Idempotent producer, as I read it, would append 7-9 to the partition
 and return success; meanwhile, conditional publish would fail the
 entire batch.)

 - If producers might be writing different streams of messages, the
 proposed behaviour of the idempotent producer is probably worse --
 since it can silently interleave messages from two different
 producers. This can be a problem for some commit-log style use-cases,
 since it can transform a valid series of operations into an invalid
 one.

 - Given the error-on-duplicate behaviour, it's possible to implement
 deduplication on the client. (Sketch: if a publish returns an error
 for some partition, fetch the upcoming offset / sequence number for
 that partition, and discard all messages with a smaller offset on the
 client before republishing.)

 I think this makes the erroring behaviour more general, though
 deduplicating saves a roundtrip or two at conflict time.

 I'm less clear about the behaviour of the generation id, or what
 happens when (say) two producers with the same generation id are spun
 up at the same time. I'd be interested in hearing other folks'
 comments on this.

 Ewen: I'm not sure I understand the questions well enough to answer
 properly, but some quick notes:
 - I don't think it makes sense to assign an expected offset without
 already having assigned a partition. If the producer code is doing the
 partition assignment, it should probably do the offset assignment
 too... or we could just let application code handle both.
 - I'm not aware of any case where reassigning offsets to messages
 automatically after an offset mismatch makes sense: in the cases we've
 discussed, it seems like either it's safe to drop duplicates, or we
 want to handle the error at the application level.

 I'm going to try and come with an idempotent-producer-type example
 that works with the draft patch in the next few days, so hopefully
 we'll have something more concrete to discuss. Otherwise -- if you
 have a clear idea of how eg. sequence number assignment would work in
 the idempotent-producer proposal, we could probably translate that
 over to get the equivalent for the conditional publish API.



 On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava
 e...@confluent.io wrote:
  @Becket - for compressed batches, I think this just works out given the
 KIP
  as described. Without the change you're referring to, it still only makes
  sense to batch messages with this KIP if all the expected offsets are
  sequential (else some messages are guaranteed to 

Re: [DISCUSS] Partitioning in Kafka

2015-07-27 Thread Gwen Shapira
I guess it depends on whether the original producer did any map
tasks or simply wrote raw data. We usually advocate writing raw data,
and since we need to write it anyway, the partitioner doesn't
introduce any extra hops.

Its definitely useful to look at use-cases and I need to think a bit
more on whether huge-key-space-with-large-skew is the only one.
I think that there are use-cases that are not pure-aggregate and
therefore keeping key-list in memory won't help and scaling to large
number of partitions is still required (and therefore skew is a
critical problem). However, I may be making stuff up, so need to
double check.

Gwen





On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava
e...@confluent.io wrote:
 Gwen - this is really like two steps of map reduce though, right? The first
 step does the partial shuffle to two partitions per key, second step does
 partial reduce + final full shuffle, final step does the final reduce.

 This strikes me as similar to partition assignment strategies in the
 consumer in that there will probably be a small handful of commonly used
 strategies that we can just maintain as part of Kafka. A few people will
 need more obscure strategies and they can maintain those implementations
 themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash
 and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner
 for partitioning matrices, and ShuffleRowRDD for their SQL implementation.
 So I don't think it would be a big deal to include it here, although I'm
 not really sure how often it's useful -- compared to normal partitioning or
 just doing two steps by starting with unpartitioned data, you need to be
 performing an aggregation, the key set needs to be large enough for memory
 usage to be a problem (i.e. you don't want each consumer to have to
 maintain a map with every key in it), and a sufficiently skewed
 distribution (i.e. not just 1 or 2 very hot keys). The key set constraint,
 in particular, is the one I'm not convinced by since in practice if you
 have a skewed distribution, you probably also won't actually see every key
 in every partition; each worker actually only needs to maintain a subset of
 the key set (and associated aggregate data) in memory.


 On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

 If you are used to map-reduce patterns, this sounds like a perfectly
 natural way to process streams of data.

 Call the first consumer map-combine-log, the topic shuffle-log and
 the second consumer reduce-log :)
 I like that a lot. It works well for either embarrassingly parallel
 cases, or so much data that more parallelism is worth the extra
 overhead cases.

 I personally don't care if its in core-Kafka, KIP-28 or a github
 project elsewhere, but I find it useful and non-esoteric.



 On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io
 wrote:
  For a little background, the difference between this partitioner and the
  default one is that it breaks the deterministic mapping from key to
  partition. Instead, messages for a given key can end up in either of two
  partitions. This means that the consumer generally won't see all messages
  for a given key. Instead the consumer would compute an aggregate for each
  key on the partitions it consumes and write them to a separate topic. For
  example, if you are writing log messages to a logs topic with the
  hostname as the key, you could this partitioning strategy to compute
  message counts for each host in each partition and write them to a
  log-counts topic. Then a consumer of the log-counts topic would
 compute
  total aggregates based on the two intermediate aggregates. The benefit is
  that you are generally going to get better load balancing across
 partitions
  than if you used the default partitioner. (Please correct me if my
  understanding is incorrect, Gianmarco)
 
  So I think the question is whether this is a useful primitive for Kafka
 to
  provide out of the box? I was a little concerned that this use case is a
  little esoteric for a core feature, but it may make more sense in the
  context of KIP-28 which would provide some higher-level processing
  capabilities (though it doesn't seem like the KStream abstraction would
  provide a direct way to leverage this partitioner without custom logic).
 
  Thanks,
  Jason
 
 
  On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales 
  g...@apache.org wrote:
 
  Hello folks,
 
  I'd like to ask the community about its opinion on the partitioning
  functions in Kafka.
 
  With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
  integrated we are now able to have custom partitioners in the producer.
  The question now becomes *which* partitioners should ship with Kafka?
  This issue arose in the context of KAFKA-2092
  https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
  specific load-balanced partitioning. This partitioner however assumes
 some

[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient

2015-07-27 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643533#comment-14643533
 ] 

Mayuresh Gharat commented on KAFKA-2120:


Updated reviewboard https://reviews.apache.org/r/36858/diff/
 against branch origin/trunk

 Add a request timeout to NetworkClient
 --

 Key: KAFKA-2120
 URL: https://issues.apache.org/jira/browse/KAFKA-2120
 Project: Kafka
  Issue Type: New Feature
Reporter: Jiangjie Qin
Assignee: Mayuresh Gharat
 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch


 Currently NetworkClient does not have a timeout setting for requests. So if 
 no response is received for a request due to reasons such as broker is down, 
 the request will never be completed.
 Request timeout will also be used as implicit timeout for some methods such 
 as KafkaProducer.flush() and kafkaProducer.close().
 KIP-19 is created for this public interface change.
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2120) Add a request timeout to NetworkClient

2015-07-27 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-2120:
---
Attachment: KAFKA-2120_2015-07-27_15:31:19.patch

 Add a request timeout to NetworkClient
 --

 Key: KAFKA-2120
 URL: https://issues.apache.org/jira/browse/KAFKA-2120
 Project: Kafka
  Issue Type: New Feature
Reporter: Jiangjie Qin
Assignee: Mayuresh Gharat
 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch


 Currently NetworkClient does not have a timeout setting for requests. So if 
 no response is received for a request due to reasons such as broker is down, 
 the request will never be completed.
 Request timeout will also be used as implicit timeout for some methods such 
 as KafkaProducer.flush() and kafkaProducer.close().
 KIP-19 is created for this public interface change.
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: error while high level consumer

2015-07-27 Thread Mayuresh Gharat
Try bouncing the host that appears in the stored data section.

Thanks,

Mayuresh

On Mon, Jul 27, 2015 at 3:41 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 This is due to the zookeeper path storing the previous owner info hasn't
 been deleted at the moment. If the rebalance completes after retry, it
 should be fine.

 Jiangjie (Becket) Qin

 On Fri, Jul 24, 2015 at 6:54 PM, Kris K squareksc...@gmail.com wrote:

  Hi,
 
  I started seeing these errors in the logs continuously when I try to
 bring
  the High Level Consumer up. Please help.
 
 
  ZookeeperConsumerConnector [INFO] [XXX], waiting for the partition
  ownership to be deleted: 1
 
  ZookeeperConsumerConnector [INFO] [XXX], end rebalancing
  consumer XXX try #0
 
  ZookeeperConsumerConnector [INFO] [XXX], Rebalancing attempt failed.
  Clearing the cache before the next rebalancing operation is triggered
 
 
  Thanks,
 
  Kris
 




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


[jira] [Created] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer

2015-07-27 Thread Ashish K Singh (JIRA)
Ashish K Singh created KAFKA-2381:
-

 Summary: Possible ConcurrentModificationException while 
unsubscribing from a topic in new consumer
 Key: KAFKA-2381
 URL: https://issues.apache.org/jira/browse/KAFKA-2381
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh


Possible ConcurrentModificationException while unsubscribing from a topic in 
new consumer. Attempt is made to modify AssignedPartitions while looping over 
it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer

2015-07-27 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643703#comment-14643703
 ] 

Ashish K Singh commented on KAFKA-2381:
---

[~gwenshap] could you take a look when you get a chance.

 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer
 -

 Key: KAFKA-2381
 URL: https://issues.apache.org/jira/browse/KAFKA-2381
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Attachments: KAFKA-2381.patch, KAFKA-2381_2015-07-27_17:56:00.patch


 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer. Attempt is made to modify AssignedPartitions while looping over 
 it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer

2015-07-27 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-2381:
--
Attachment: KAFKA-2381_2015-07-27_17:56:00.patch

 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer
 -

 Key: KAFKA-2381
 URL: https://issues.apache.org/jira/browse/KAFKA-2381
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Attachments: KAFKA-2381.patch, KAFKA-2381_2015-07-27_17:56:00.patch


 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer. Attempt is made to modify AssignedPartitions while looping over 
 it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer

2015-07-27 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643699#comment-14643699
 ] 

Ashish K Singh commented on KAFKA-2381:
---

Updated reviewboard https://reviews.apache.org/r/36871/
 against branch trunk

 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer
 -

 Key: KAFKA-2381
 URL: https://issues.apache.org/jira/browse/KAFKA-2381
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Attachments: KAFKA-2381.patch, KAFKA-2381_2015-07-27_17:56:00.patch


 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer. Attempt is made to modify AssignedPartitions while looping over 
 it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36871: Patch for KAFKA-2381

2015-07-27 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36871/#review93215
---



core/src/test/scala/integration/kafka/api/ConsumerTest.scala (line 233)
https://reviews.apache.org/r/36871/#comment147535

consider closing this in a finally. A failing test can cause incorrect tear 
down of the test


- Aditya Auradkar


On July 28, 2015, 12:56 a.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36871/
 ---
 
 (Updated July 28, 2015, 12:56 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2381
 https://issues.apache.org/jira/browse/KAFKA-2381
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from 
 a topic in new consumer
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  4d9a425201115a66b457b58d670992b279091f5a 
   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
 
 Diff: https://reviews.apache.org/r/36871/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ashish Singh
 




[jira] [Updated] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer

2015-07-27 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-2381:
--
Status: Patch Available  (was: Open)

 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer
 -

 Key: KAFKA-2381
 URL: https://issues.apache.org/jira/browse/KAFKA-2381
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Attachments: KAFKA-2381.patch


 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer. Attempt is made to modify AssignedPartitions while looping over 
 it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer

2015-07-27 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-2381:
--
Attachment: KAFKA-2381.patch

 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer
 -

 Key: KAFKA-2381
 URL: https://issues.apache.org/jira/browse/KAFKA-2381
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Attachments: KAFKA-2381.patch


 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer. Attempt is made to modify AssignedPartitions while looping over 
 it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 36871: Patch for KAFKA-2381

2015-07-27 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36871/
---

Review request for kafka.


Bugs: KAFKA-2381
https://issues.apache.org/jira/browse/KAFKA-2381


Repository: kafka


Description
---

KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a 
topic in new consumer


Diffs
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 4d9a425201115a66b457b58d670992b279091f5a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 

Diff: https://reviews.apache.org/r/36871/diff/


Testing
---


Thanks,

Ashish Singh



[jira] [Commented] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerGroupCommand

2015-07-27 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643716#comment-14643716
 ] 

Ashish K Singh commented on KAFKA-313:
--

[~gwenshap] need help with getting this KIP to done state :). It has been quite 
some time.

 Add JSON/CSV output and looping options to ConsumerGroupCommand
 ---

 Key: KAFKA-313
 URL: https://issues.apache.org/jira/browse/KAFKA-313
 Project: Kafka
  Issue Type: Improvement
Reporter: Dave DeMaagd
Assignee: Ashish K Singh
Priority: Minor
  Labels: newbie, patch
 Fix For: 0.8.3

 Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, 
 KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch, 
 KAFKA-313_2015-06-24_11:14:24.patch


 Adds:
 * '--loop N' - causes the program to loop forever, sleeping for up to N 
 seconds between loops (loop time minus collection time, unless that's less 
 than 0, at which point it will just run again immediately)
 * '--asjson' - display as a JSON string instead of the more human readable 
 output format.
 Neither of the above  depend on each other (you can loop in the human 
 readable output, or do a single shot execution with JSON output).  Existing 
 behavior/output maintained if neither of the above are used.  Diff Attached.
 Impacted files:
 core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2360) The kafka-consumer-perf-test.sh script help information print useless parameters.

2015-07-27 Thread Bo Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bo Wang updated KAFKA-2360:
---
Description: 
Run kafka-consumer-perf-test.sh --help to show help information,  but found 3 
parameters useless : 
--batch-size and --batch-size
That is producer of parameters.

bin]# ./kafka-consumer-perf-test.sh --help
Missing required argument [topic]
Option  Description
--  ---
--batch-size Integer: sizeNumber of messages to write in a   
  single batch. (default: 200) 
--compression-codec Integer:   If set, messages are sent compressed   
  supported codec: NoCompressionCodec (default: 0) 
  as 0, GZIPCompressionCodec as 1, 
  SnappyCompressionCodec as 2, 
  LZ4CompressionCodec as 3
--date-format date format The date format to use for formatting  
  the time field. See java.text.   
  SimpleDateFormat for options.
  (default: -MM-dd HH:mm:ss:SSS)   
--fetch-size Integer: sizeThe amount of data to fetch in a   
  single request. (default: 1048576)   

  was:
Run kafka-consumer-perf-test.sh --help to show help information,  but found two 
parameters useless : 
--batch-size and --batch-size
That is producer of parameters.

bin]# ./kafka-consumer-perf-test.sh --help
Missing required argument [topic]
Option  Description
--  ---
--batch-size Integer: sizeNumber of messages to write in a   
  single batch. (default: 200) 
--compression-codec Integer:   If set, messages are sent compressed   
  supported codec: NoCompressionCodec (default: 0) 
  as 0, GZIPCompressionCodec as 1, 
  SnappyCompressionCodec as 2, 
  LZ4CompressionCodec as 3
--date-format date format The date format to use for formatting  
  the time field. See java.text.   
  SimpleDateFormat for options.
  (default: -MM-dd HH:mm:ss:SSS)   
--fetch-size Integer: sizeThe amount of data to fetch in a   
  single request. (default: 1048576)   


 The kafka-consumer-perf-test.sh script help information print useless 
 parameters.
 -

 Key: KAFKA-2360
 URL: https://issues.apache.org/jira/browse/KAFKA-2360
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.2.1
 Environment: Linux
Reporter: Bo Wang
Priority: Minor
   Original Estimate: 24h
  Remaining Estimate: 24h

 Run kafka-consumer-perf-test.sh --help to show help information,  but found 3 
 parameters useless : 
 --batch-size and --batch-size
 That is producer of parameters.
 bin]# ./kafka-consumer-perf-test.sh --help
 Missing required argument [topic]
 Option  Description   
  
 --  ---   
  
 --batch-size Integer: sizeNumber of messages to write in a  
  
   single batch. (default: 200)
  
 --compression-codec Integer:   If set, messages are sent compressed  
  
   supported codec: NoCompressionCodec (default: 0)
  
   as 0, GZIPCompressionCodec as 1,
  
   SnappyCompressionCodec as 2,
  
   LZ4CompressionCodec as 3   
  
 --date-format date format The date format to use for formatting 
  
   the time field. See java.text.  
  
   SimpleDateFormat for options.   
  
   (default: -MM-dd HH:mm:ss:SSS)  
  
 --fetch-size Integer: sizeThe amount of data to fetch in a 

[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client

2015-07-27 Thread Sourabh Chandak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643658#comment-14643658
 ] 

Sourabh Chandak commented on KAFKA-1690:


Awesome!
When will this be integrated to the main branch?

 new java producer needs ssl support as a client
 ---

 Key: KAFKA-1690
 URL: https://issues.apache.org/jira/browse/KAFKA-1690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.3

 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, 
 KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, 
 KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, 
 KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, 
 KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, 
 KAFKA-1690_2015-06-23_13:18:20.patch, KAFKA-1690_2015-07-20_06:10:42.patch, 
 KAFKA-1690_2015-07-20_11:59:57.patch, KAFKA-1690_2015-07-25_12:10:55.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36871: Patch for KAFKA-2381

2015-07-27 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36871/
---

(Updated July 28, 2015, 12:56 a.m.)


Review request for kafka.


Bugs: KAFKA-2381
https://issues.apache.org/jira/browse/KAFKA-2381


Repository: kafka


Description
---

KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a 
topic in new consumer


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 4d9a425201115a66b457b58d670992b279091f5a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 

Diff: https://reviews.apache.org/r/36871/diff/


Testing
---


Thanks,

Ashish Singh



[jira] [Commented] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer

2015-07-27 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643693#comment-14643693
 ] 

Ashish K Singh commented on KAFKA-2381:
---

Created reviewboard https://reviews.apache.org/r/36871/
 against branch trunk

 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer
 -

 Key: KAFKA-2381
 URL: https://issues.apache.org/jira/browse/KAFKA-2381
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Attachments: KAFKA-2381.patch


 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer. Attempt is made to modify AssignedPartitions while looping over 
 it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2275) Add a ListTopics() API to the new consumer

2015-07-27 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643710#comment-14643710
 ] 

Ashish K Singh commented on KAFKA-2275:
---

[~guozhang] I think this is in good shape now with two +1s. Could you help with 
taking a final look and committing it?

 Add a ListTopics() API to the new consumer
 --

 Key: KAFKA-2275
 URL: https://issues.apache.org/jira/browse/KAFKA-2275
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Ashish K Singh
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-2275.patch, KAFKA-2275.patch, 
 KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch, 
 KAFKA-2275_2015-07-22_16:09:34.patch, KAFKA-2275_2015-07-23_09:34:41.patch


 With regex subscription like
 {code}
 consumer.subscribe(topic*)
 {code}
 The partition assignment is automatically done at the Kafka side, while there 
 are some use cases where consumers want regex subscriptions but not 
 Kafka-side partition assignment, rather with their own specific partition 
 assignment. With ListTopics() they can periodically check for topic list 
 changes and specifically subscribe to the partitions of the new topics.
 For implementation, it involves sending a TopicMetadataRequest to a random 
 broker and parse the response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2301) Deprecate ConsumerOffsetChecker

2015-07-27 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643714#comment-14643714
 ] 

Ashish K Singh commented on KAFKA-2301:
---

[~junrao], [~gwenshap] can any of you help in getting this to done. Thanks!

 Deprecate ConsumerOffsetChecker
 ---

 Key: KAFKA-2301
 URL: https://issues.apache.org/jira/browse/KAFKA-2301
 Project: Kafka
  Issue Type: Sub-task
  Components: tools
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Fix For: 0.8.3

 Attachments: KAFKA-2301.patch, KAFKA-2301_2015-07-01_17:46:34.patch, 
 KAFKA-2301_2015-07-02_09:04:35.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36871: Patch for KAFKA-2381

2015-07-27 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36871/#review93213
---


Ouch. Hard to believe this wasn't caught yet.


core/src/test/scala/integration/kafka/api/ConsumerTest.scala (line 220)
https://reviews.apache.org/r/36871/#comment147533

Could we catch this issue more directly with a unit test for 
SubscriptionState?


- Jason Gustafson


On July 28, 2015, 12:56 a.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36871/
 ---
 
 (Updated July 28, 2015, 12:56 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2381
 https://issues.apache.org/jira/browse/KAFKA-2381
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from 
 a topic in new consumer
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  4d9a425201115a66b457b58d670992b279091f5a 
   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
 
 Diff: https://reviews.apache.org/r/36871/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ashish Singh
 




[jira] [Updated] (KAFKA-2360) The kafka-consumer-perf-test.sh script help information print useless parameters.

2015-07-27 Thread Bo Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bo Wang updated KAFKA-2360:
---
Description: 
Run kafka-consumer-perf-test.sh --help to show help information,  but found 3 
parameters useless : 
--batch-size and --batch-size --messages
That is producer of parameters.

bin]# ./kafka-consumer-perf-test.sh --help
Missing required argument [topic]
Option  Description
--  ---
--batch-size Integer: sizeNumber of messages to write in a   
  single batch. (default: 200) 
--compression-codec Integer:   If set, messages are sent compressed   
  supported codec: NoCompressionCodec (default: 0) 
  as 0, GZIPCompressionCodec as 1, 
  SnappyCompressionCodec as 2, 
  LZ4CompressionCodec as 3
--date-format date format The date format to use for formatting  
  the time field. See java.text.   
  SimpleDateFormat for options.
  (default: -MM-dd HH:mm:ss:SSS)   
--fetch-size Integer: sizeThe amount of data to fetch in a   
  single request. (default: 1048576)   
--messages Long: countThe number of messages to send or  
  consume (default:
  9223372036854775807)

  was:
Run kafka-consumer-perf-test.sh --help to show help information,  but found 3 
parameters useless : 
--batch-size and --batch-size
That is producer of parameters.

bin]# ./kafka-consumer-perf-test.sh --help
Missing required argument [topic]
Option  Description
--  ---
--batch-size Integer: sizeNumber of messages to write in a   
  single batch. (default: 200) 
--compression-codec Integer:   If set, messages are sent compressed   
  supported codec: NoCompressionCodec (default: 0) 
  as 0, GZIPCompressionCodec as 1, 
  SnappyCompressionCodec as 2, 
  LZ4CompressionCodec as 3
--date-format date format The date format to use for formatting  
  the time field. See java.text.   
  SimpleDateFormat for options.
  (default: -MM-dd HH:mm:ss:SSS)   
--fetch-size Integer: sizeThe amount of data to fetch in a   
  single request. (default: 1048576)   


 The kafka-consumer-perf-test.sh script help information print useless 
 parameters.
 -

 Key: KAFKA-2360
 URL: https://issues.apache.org/jira/browse/KAFKA-2360
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.2.1
 Environment: Linux
Reporter: Bo Wang
Priority: Minor
   Original Estimate: 24h
  Remaining Estimate: 24h

 Run kafka-consumer-perf-test.sh --help to show help information,  but found 3 
 parameters useless : 
 --batch-size and --batch-size --messages
 That is producer of parameters.
 bin]# ./kafka-consumer-perf-test.sh --help
 Missing required argument [topic]
 Option  Description   
  
 --  ---   
  
 --batch-size Integer: sizeNumber of messages to write in a  
  
   single batch. (default: 200)
  
 --compression-codec Integer:   If set, messages are sent compressed  
  
   supported codec: NoCompressionCodec (default: 0)
  
   as 0, GZIPCompressionCodec as 1,
  
   SnappyCompressionCodec as 2,
  
   LZ4CompressionCodec as 3   
  
 --date-format date format The date format to use for formatting 
  
   the time field. See java.text.  
  
 

[jira] [Created] (KAFKA-2380) Publish Kafka snapshot Maven artifacts

2015-07-27 Thread Stevo Slavic (JIRA)
Stevo Slavic created KAFKA-2380:
---

 Summary: Publish Kafka snapshot Maven artifacts
 Key: KAFKA-2380
 URL: https://issues.apache.org/jira/browse/KAFKA-2380
 Project: Kafka
  Issue Type: Task
Affects Versions: 0.8.2.1
Reporter: Stevo Slavic
Priority: Minor


Please have Kafka snapshot Maven artifacts published regularly (e.g. either 
after every successful CI job run, or after successful nightly CI job run) to 
[Apache snapshots 
repository|http://repository.apache.org/content/groups/snapshots/org/apache/kafka/].

It will be very helpful for and promote early integration efforts, of 
patches/fixes to issues or of brand new Kafka clients/server versions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2381) Possible ConcurrentModificationException while unsubscribing from a topic in new consumer

2015-07-27 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643843#comment-14643843
 ] 

Ashish K Singh commented on KAFKA-2381:
---

Updated reviewboard https://reviews.apache.org/r/36871/
 against branch trunk

 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer
 -

 Key: KAFKA-2381
 URL: https://issues.apache.org/jira/browse/KAFKA-2381
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Attachments: KAFKA-2381.patch, KAFKA-2381_2015-07-27_17:56:00.patch, 
 KAFKA-2381_2015-07-27_21:56:06.patch


 Possible ConcurrentModificationException while unsubscribing from a topic in 
 new consumer. Attempt is made to modify AssignedPartitions while looping over 
 it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36871: Patch for KAFKA-2381

2015-07-27 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36871/
---

(Updated July 28, 2015, 4:56 a.m.)


Review request for kafka.


Bugs: KAFKA-2381
https://issues.apache.org/jira/browse/KAFKA-2381


Repository: kafka


Description (updated)
---

Add a more specific unit test


KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a 
topic in new consumer


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 4d9a425201115a66b457b58d670992b279091f5a 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 319751c374ccdc7e7d7d74bcd01bc279b1bdb26e 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 

Diff: https://reviews.apache.org/r/36871/diff/


Testing
---


Thanks,

Ashish Singh



  1   2   >