[jira] [Commented] (KAFKA-2394) Use RollingFileAppender by default in log4j.properties

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339041#comment-15339041
 ] 

Ewen Cheslack-Postava commented on KAFKA-2394:
--

[~cotedm] I think that makes sense. My concern was whether 0.11 was too far 
off. For semver, it is absolutely the right time to make the change. Since it's 
really hard to predict whether we will have Kafka 0.10.1, 0.10.2, etc, I have 
no idea when we should try to push changes like this :)

> Use RollingFileAppender by default in log4j.properties
> --
>
> Key: KAFKA-2394
> URL: https://issues.apache.org/jira/browse/KAFKA-2394
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dustin Cote
>Priority: Minor
>  Labels: newbie
> Fix For: 0.11.0.0
>
> Attachments: log4j.properties.patch
>
>
> The default log4j.properties bundled with Kafka uses ConsoleAppender and 
> DailyRollingFileAppender, which offer no protection to users from spammy 
> logging. In extreme cases (such as when issues like KAFKA-1461 are 
> encountered), the logs can exhaust the local disk space. This could be a 
> problem for Kafka adoption since new users are less likely to adjust the 
> logging properties themselves, and are more likely to have configuration 
> problems which result in log spam. 
> To fix this, we can use RollingFileAppender, which offers two settings for 
> controlling the maximum space that log files will use.
> maxBackupIndex: how many backup files to retain
> maxFileSize: the max size of each log file
> One question is whether this change is a compatibility concern? The backup 
> strategy and filenames used by RollingFileAppender are different from those 
> used by DailyRollingFileAppender, so any tools which depend on the old format 
> will break. If we think this is a serious problem, one solution would be to 
> provide two versions of log4j.properties and add a flag to enable the new 
> one. Another solution would be to include the RollingFileAppender 
> configuration in the default log4j.properties, but commented out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-3827) log.message.format.version should default to inter.broker.protocol.version

2016-06-19 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3827:
---
Comment: was deleted

(was: [~junrao] Is that the behavior we really want? I don't think that turning 
"down" the inter broker protocol or message format is a good idea, but whereas 
you could potentially safely downgrade the protocol version (as long as all 
brokers were also turned down to that version), if you already had messages 
published in 0.10 format, it isn't safe to turn down the message format, is it?)

> log.message.format.version should default to inter.broker.protocol.version
> --
>
> Key: KAFKA-3827
> URL: https://issues.apache.org/jira/browse/KAFKA-3827
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Manasvi Gupta
>  Labels: newbie
>
> Currently, if one sets inter.broker.protocol.version to 0.9.0 and restarts 
> the broker, one will get the following exception since 
> log.message.format.version defaults to 0.10.0. It will be more intuitive if 
> log.message.format.version defaults to the value of 
> inter.broker.protocol.version.
> java.lang.IllegalArgumentException: requirement failed: 
> log.message.format.version 0.10.0-IV1 cannot be used when 
> inter.broker.protocol.version is set to 0.9.0.1
>   at scala.Predef$.require(Predef.scala:233)
>   at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1023)
>   at kafka.server.KafkaConfig.(KafkaConfig.scala:994)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:743)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:740)
>   at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
>   at kafka.Kafka$.main(Kafka.scala:58)
>   at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-19 Thread Ismael Juma
For what is worth, I also agree that this is better. It is awkward to
implement and it is not done by any other request and that is why I didn't
push for it, but happy that we are going in that direction. It's worth
noting that even with this change, it seems to me that looking at the logs
may still be needed since the error code would be `InvalidRequest` and we
don't have a mechanism to pass a string along with the error (the
alternative is to add a bunch of error codes).

Ismael

On Mon, Jun 20, 2016 at 5:57 AM, Grant Henke  wrote:

> Apologies for the delay in response here.
>
> It will take a bit of tracking inside the request object to track this
> error and then handle it in KafkaApis, but it is possible. I am happy to
> take that preferred approach. I will update the wiki & patch to handle this
> scenario and re-initiate the vote tomorrow.
>
> Thanks,
> Grant
>
> On Sun, Jun 19, 2016 at 8:59 PM, Ewen Cheslack-Postava 
> wrote:
>
> > I'm on the same page as Jun & Dana wrt disconnecting. Closing a
> connection
> > should really be a last resort because we can no longer trust correct
> > behavior in this session. In this case, we detect a bad request, but
> > there's no reason to believe it will affect subsequent requests. There
> are
> > dependencies to be sure, and if the client doesn't check errors, they may
> > try to then write to topics that don't exist or something along those
> > lines, but those requests can also be failed without killing the
> underlying
> > TCP connection.
> >
> > -Ewen
> >
> > On Fri, Jun 17, 2016 at 1:46 PM, Jun Rao  wrote:
> >
> > > Grant,
> > >
> > > I think Dana has a valid point. Currently, we throw an
> > > InvalidRequestException and close the connection only when the broker
> > can't
> > > deserialize the bytes into a request. In this case, the deserialization
> > is
> > > fine. It just that there are some additional constraints that can't be
> > > specified at the protocol level. We can potentially just remember the
> > > topics that violated those constraints in the request and handle them
> > > accordingly with the right error code w/o disconnecting.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Jun 17, 2016 at 8:40 AM, Dana Powers 
> > > wrote:
> > >
> > > > I'm unconvinced (crazy, right?). Comments below:
> > > >
> > > > On Fri, Jun 17, 2016 at 7:27 AM, Grant Henke 
> > > wrote:
> > > > > Hi Dana,
> > > > >
> > > > > You mentioned one of the reasons I error and disconnect. Because I
> > > can't
> > > > > return an error for every request so the cardinality between
> request
> > > and
> > > > > response would be different. Beyond that though, I am handling this
> > > > > protocol rule/parsing error the same way all other messages do.
> > > >
> > > > But you can return an error for every topic, and isn't that the level
> > > > of error required here?
> > > >
> > > > > CreateTopic Response (Version: 0) => [topic_error_codes]
> > > > >   topic_error_codes => topic error_code
> > > > > topic => STRING
> > > > > error_code => INT16
> > > >
> > > > If I submit duplicate requests for a topic, it's an error isolated to
> > > > that topic. If I mess up the partition / replication / etc semantics
> > > > for a topic, that's an error isolated to that topic. Is there a
> > > > cardinality problem at this level?
> > > >
> > > >
> > > > >
> > > > > Parsing is handled in the RequestChannel and any exception that
> > occurs
> > > > > during this phase is caught, converted into an
> > InvalidRequestException
> > > > and
> > > > > results in a disconnect:
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L92-L95
> > > > >
> > > > > Since this is an error that could only occur (and would always
> occur)
> > > due
> > > > > to incorrect client implementations, and not because of any cluster
> > > state
> > > > > or unusual situation, I felt this behavior was okay and made sense.
> > For
> > > > > client developers the broker logging should make it obvious what
> the
> > > > issue
> > > > > is. My patch also clearly documents the protocol rules in the
> > Protocol
> > > > > definition.
> > > >
> > > > Documentation is great and definitely a must. But requiring client
> > > > developers to dig through server logs is not ideal. Client developers
> > > > don't always have direct access to those logs. And even if they do,
> > > > the brokers may have other traffic, which makes it difficult to track
> > > > down the exact point in the logs where the error occurred.
> > > >
> > > > As discussed above, I don't think you need to or should model this as
> > > > a request-level parsing error. It may be easier for the current
> broker
> > > > implementation to do that and just crash the connection, but I don't
> > > > think it makes that much sense from a raw api perspective.
> > > >
> > > > > In 

[jira] [Updated] (KAFKA-3829) Warn that kafka-connect group.id must not conflict with connector names

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-3829:
-
 Priority: Critical  (was: Minor)
Fix Version/s: 0.10.1.0
  Summary: Warn that kafka-connect group.id must not conflict with 
connector names  (was: Warn that kafka-connect group.id must not conflice with 
connector names)

Bumping the importance of this and marking for 0.10.1.0. It's not critical for 
an 0.10.0.1, but it's pretty bad if there is no indication that there is a 
conflict, especially when it is obvious to the connect worker that there *will* 
be a conflict since the worker knows both group names. Any sort of warning & 
bailing upon conflict would be better than silently ignoring and then failing 
the way we currently do.

> Warn that kafka-connect group.id must not conflict with connector names
> ---
>
> Key: KAFKA-3829
> URL: https://issues.apache.org/jira/browse/KAFKA-3829
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Barry Kaplan
>Assignee: Ewen Cheslack-Postava
>Priority: Critical
>  Labels: documentation
> Fix For: 0.10.1.0
>
>
> If the group.id value happens to have the same value as a connector names the 
> following error will be issued:
> {quote}
> Attempt to join group connect-elasticsearch-indexer failed due to: The group 
> member's supported protocols are incompatible with those of existing members.
> {quote}
> Maybe the documentation for Distributed Worker Configuration group.id could 
> be worded:
> {quote}
> A unique string that identifies the Connect cluster group this worker belongs 
> to. This value must be different than all connector configuration 'name' 
> properties.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3827) log.message.format.version should default to inter.broker.protocol.version

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339031#comment-15339031
 ] 

Ewen Cheslack-Postava commented on KAFKA-3827:
--

[~junrao] Is that the behavior we really want? I don't think that turning 
"down" the inter broker protocol or message format is a good idea, but whereas 
you could potentially safely downgrade the protocol version (as long as all 
brokers were also turned down to that version), if you already had messages 
published in 0.10 format, it isn't safe to turn down the message format, is it?

> log.message.format.version should default to inter.broker.protocol.version
> --
>
> Key: KAFKA-3827
> URL: https://issues.apache.org/jira/browse/KAFKA-3827
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Manasvi Gupta
>  Labels: newbie
>
> Currently, if one sets inter.broker.protocol.version to 0.9.0 and restarts 
> the broker, one will get the following exception since 
> log.message.format.version defaults to 0.10.0. It will be more intuitive if 
> log.message.format.version defaults to the value of 
> inter.broker.protocol.version.
> java.lang.IllegalArgumentException: requirement failed: 
> log.message.format.version 0.10.0-IV1 cannot be used when 
> inter.broker.protocol.version is set to 0.9.0.1
>   at scala.Predef$.require(Predef.scala:233)
>   at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1023)
>   at kafka.server.KafkaConfig.(KafkaConfig.scala:994)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:743)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:740)
>   at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
>   at kafka.Kafka$.main(Kafka.scala:58)
>   at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3827) log.message.format.version should default to inter.broker.protocol.version

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339030#comment-15339030
 ] 

Ewen Cheslack-Postava commented on KAFKA-3827:
--

[~junrao] Is that the behavior we really want? I don't think that turning 
"down" the inter broker protocol or message format is a good idea, but whereas 
you could potentially safely downgrade the protocol version (as long as all 
brokers were also turned down to that version), if you already had messages 
published in 0.10 format, it isn't safe to turn down the message format, is it?

> log.message.format.version should default to inter.broker.protocol.version
> --
>
> Key: KAFKA-3827
> URL: https://issues.apache.org/jira/browse/KAFKA-3827
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Manasvi Gupta
>  Labels: newbie
>
> Currently, if one sets inter.broker.protocol.version to 0.9.0 and restarts 
> the broker, one will get the following exception since 
> log.message.format.version defaults to 0.10.0. It will be more intuitive if 
> log.message.format.version defaults to the value of 
> inter.broker.protocol.version.
> java.lang.IllegalArgumentException: requirement failed: 
> log.message.format.version 0.10.0-IV1 cannot be used when 
> inter.broker.protocol.version is set to 0.9.0.1
>   at scala.Predef$.require(Predef.scala:233)
>   at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1023)
>   at kafka.server.KafkaConfig.(KafkaConfig.scala:994)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:743)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:740)
>   at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
>   at kafka.Kafka$.main(Kafka.scala:58)
>   at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3824) Docs indicate auto.commit breaks at least once delivery but that is incorrect

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-3824:
-
  Assignee: Jason Gustafson  (was: kambiz shahri)
Remaining Estimate: 24h
 Original Estimate: 24h
  Reviewer: Ewen Cheslack-Postava
 Fix Version/s: 0.10.0.1
0.10.1.0
   Component/s: consumer

[~hachikuji] Sorry for the docs work, but reassigning this to you as you are 
definitely the best equipped to round out the documentation here. Should be 
trivial as we went through a lot of trouble to get correct auto commit locked 
down. If we manage to get a 0.10.0.1 out, we should absolutely include this 
since the better semantics in the new consumer are worth emphasizing and should 
be but a few sentences in the docs.

(The newbie label is interesting here. In theory I'd like to think this was a 
newbie JIRA. In practice, anything re: delivery semantics is *way* more 
complicated than initially predicted.)

> Docs indicate auto.commit breaks at least once delivery but that is incorrect
> -
>
> Key: KAFKA-3824
> URL: https://issues.apache.org/jira/browse/KAFKA-3824
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
>  Labels: newbie
> Fix For: 0.10.1.0, 0.10.0.1
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The javadocs for the new consumer indicate that auto commit breaks at least 
> once delivery. This is no longer correct as of 0.10. 
> http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3487) Support per-connector/per-task classloaders in Connect

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339018#comment-15339018
 ] 

Ewen Cheslack-Postava commented on KAFKA-3487:
--

[~rhauch] Absolutely, did not mean to imply it was the be-all end-all, just 
that for some use cases its extremely convenient. I wouldn't rely on Central 
being up for any sort of production use case :)

> Support per-connector/per-task classloaders in Connect
> --
>
> Key: KAFKA-3487
> URL: https://issues.apache.org/jira/browse/KAFKA-3487
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Liquan Pei
>Priority: Critical
>  Labels: needs-kip
>
> Currently we just use the default ClassLoader in Connect. However, this 
> limits how we can compatibly load conflicting connector plugins. Ideally we 
> would use a separate class loader per connector/task that is instantiated to 
> avoid potential conflicts.
> Note that this also opens up options for other ways to provide jars to 
> instantiate connectors. For example, Spark uses this to dynamically publish 
> classes defined in the REPL and load them via URL: 
> https://ardoris.wordpress.com/2014/03/30/how-spark-does-class-loading/ But 
> much simpler examples (include URL in the connector class instead of just 
> class name) are also possible and could be a nice way to more support dynamic 
> sets of connectors, multiple versions of the same connector, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-19 Thread Ewen Cheslack-Postava
Thanks Grant. Sorry to make the handling on the broker more complicated :(
but it seems like the right way to handle this.

-Ewen

On Sun, Jun 19, 2016 at 8:57 PM, Grant Henke  wrote:

> Apologies for the delay in response here.
>
> It will take a bit of tracking inside the request object to track this
> error and then handle it in KafkaApis, but it is possible. I am happy to
> take that preferred approach. I will update the wiki & patch to handle this
> scenario and re-initiate the vote tomorrow.
>
> Thanks,
> Grant
>
> On Sun, Jun 19, 2016 at 8:59 PM, Ewen Cheslack-Postava 
> wrote:
>
> > I'm on the same page as Jun & Dana wrt disconnecting. Closing a
> connection
> > should really be a last resort because we can no longer trust correct
> > behavior in this session. In this case, we detect a bad request, but
> > there's no reason to believe it will affect subsequent requests. There
> are
> > dependencies to be sure, and if the client doesn't check errors, they may
> > try to then write to topics that don't exist or something along those
> > lines, but those requests can also be failed without killing the
> underlying
> > TCP connection.
> >
> > -Ewen
> >
> > On Fri, Jun 17, 2016 at 1:46 PM, Jun Rao  wrote:
> >
> > > Grant,
> > >
> > > I think Dana has a valid point. Currently, we throw an
> > > InvalidRequestException and close the connection only when the broker
> > can't
> > > deserialize the bytes into a request. In this case, the deserialization
> > is
> > > fine. It just that there are some additional constraints that can't be
> > > specified at the protocol level. We can potentially just remember the
> > > topics that violated those constraints in the request and handle them
> > > accordingly with the right error code w/o disconnecting.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Jun 17, 2016 at 8:40 AM, Dana Powers 
> > > wrote:
> > >
> > > > I'm unconvinced (crazy, right?). Comments below:
> > > >
> > > > On Fri, Jun 17, 2016 at 7:27 AM, Grant Henke 
> > > wrote:
> > > > > Hi Dana,
> > > > >
> > > > > You mentioned one of the reasons I error and disconnect. Because I
> > > can't
> > > > > return an error for every request so the cardinality between
> request
> > > and
> > > > > response would be different. Beyond that though, I am handling this
> > > > > protocol rule/parsing error the same way all other messages do.
> > > >
> > > > But you can return an error for every topic, and isn't that the level
> > > > of error required here?
> > > >
> > > > > CreateTopic Response (Version: 0) => [topic_error_codes]
> > > > >   topic_error_codes => topic error_code
> > > > > topic => STRING
> > > > > error_code => INT16
> > > >
> > > > If I submit duplicate requests for a topic, it's an error isolated to
> > > > that topic. If I mess up the partition / replication / etc semantics
> > > > for a topic, that's an error isolated to that topic. Is there a
> > > > cardinality problem at this level?
> > > >
> > > >
> > > > >
> > > > > Parsing is handled in the RequestChannel and any exception that
> > occurs
> > > > > during this phase is caught, converted into an
> > InvalidRequestException
> > > > and
> > > > > results in a disconnect:
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L92-L95
> > > > >
> > > > > Since this is an error that could only occur (and would always
> occur)
> > > due
> > > > > to incorrect client implementations, and not because of any cluster
> > > state
> > > > > or unusual situation, I felt this behavior was okay and made sense.
> > For
> > > > > client developers the broker logging should make it obvious what
> the
> > > > issue
> > > > > is. My patch also clearly documents the protocol rules in the
> > Protocol
> > > > > definition.
> > > >
> > > > Documentation is great and definitely a must. But requiring client
> > > > developers to dig through server logs is not ideal. Client developers
> > > > don't always have direct access to those logs. And even if they do,
> > > > the brokers may have other traffic, which makes it difficult to track
> > > > down the exact point in the logs where the error occurred.
> > > >
> > > > As discussed above, I don't think you need to or should model this as
> > > > a request-level parsing error. It may be easier for the current
> broker
> > > > implementation to do that and just crash the connection, but I don't
> > > > think it makes that much sense from a raw api perspective.
> > > >
> > > > > In the future having a response header with an error code (and
> > > optimally
> > > > > error message) for every response would help solve this problem
> (for
> > > all
> > > > > message types).
> > > >
> > > > That will definitely help solve the more general invalid request
> error
> > > > problem. But I think given the current state of error handling /
> 

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-19 Thread Neha Narkhede
I'm in favor of a global config that is then evenly divided amongst the
threads of a Kafka Streams instance.

On Mon, Jun 13, 2016 at 6:23 PM, Guozhang Wang  wrote:

> Although this KIP is not mainly for memory management of Kafka Streams,
> since it touches on quite some part of it I think it is good to first think
> of what we would REALLY want as an end goal for memory usage in order to
> make sure that whatever we proposed in this KIP aligns with that long-term
> plan. So I wrote up this discussion page that summarized my current
> thoughts:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams
>
> As for its implication on this KIP, my personal take is that:
>
> 1. we should use a global config in terms of bytes, which will then be
> evenly divided among the threads within the Kafka Streams instance, but
> within a thread that config can be used to control the total size of all
> caches instead of further dividing that among all caches.
>
> 2. instead of caching in terms of deserialized objects we may need to
> consider just caching in terms of serialized bytes; admittedly it will
> incur costs of doing serdes for caching, but without doing so I honestly
> have no concrete clue how we can measure the current memory usage
> accurately AND efficiently (after reading the links Ismael sent me I feel
> the accurate estimates for collection types / composite types like String
> will do some serialize with sun.misc.Unsafe anyways when it uses reflection
> to crawl the object graph) although we may need to do some benchmarking
> with https://github.com/jbellis/jamm, for example to validate this claim
> or
> someone tell me that there is actually a better way that I'm not aware of..
>
>
> Guozhang
>
>
> On Mon, Jun 13, 2016 at 3:17 PM, Matthias J. Sax 
> wrote:
>
> > I am just catching up on this thread.
> >
> > From my point of view, easy tuning for the user is the most important
> > thing, because Kafka Streams is a library. Thus, a global cache size
> > parameter should be the best.
> >
> > About dividing the memory vs a single global cache. I would argue that
> > in the first place dividing the memory would be good, as synchronization
> > might kill the performance. About the cache sizes, I was thinking about
> > starting with an even distribution and adjust the individual cache sizes
> > during runtime.
> >
> > The dynamic adjustment can also be added later on. We need to figure out
> > a good internal monitoring and "cost function" to determine which task
> > needs more memory and which less. Some metrics to do this might be
> > number-of-assigned-keys, size-of-key-value-pairs, update-frequency etc.
> >
> > I have to confess, that I have no idea right now, how to design the
> > "cost function" to compute the memory size for each task. But if we want
> > to add dynamic memory management later on, it might be a good idea to
> > keep it in mind and align this KIP already for future improvements.
> >
> > -Matthias
> >
> >
> > On 06/09/2016 05:24 AM, Henry Cai wrote:
> > > One more thing for this KIP:
> > >
> > > Currently RocksDBWindowStore serialize the key/value before it puts
> into
> > > the in-memory cache, I think we should delay this
> > > serialization/deserialization unless it needs flush to db.  For a
> simple
> > > countByKey for 100 records, this would trigger 100
> > > serialization/deserialization even if everything is in-memory.
> > >
> > > If we move this internal cache from RocksDBStore to a global place, I
> > hope
> > > we can reduces the time it needs to do the serialization.
> > >
> > >
> > > On Mon, Jun 6, 2016 at 11:07 AM, Ismael Juma 
> wrote:
> > >
> > >> On Mon, Jun 6, 2016 at 6:48 PM, Guozhang Wang 
> > wrote:
> > >>>
> > >>> About using Instrumentation.getObjectSize, yeah we were worried a lot
> > >> about
> > >>> its efficiency as well as accuracy when discussing internally, but
> not
> > a
> > >>> better solution was proposed. So if people have better ideas, please
> > >> throw
> > >>> them here, as it is also the purpose for us to call out such KIP
> > >> discussion
> > >>> threads.
> > >>>
> > >>
> > >> Note that this requires a Java agent to be configured. A few links:
> > >>
> > >>
> > >>
> >
> https://github.com/apache/spark/blob/b0ce0d13127431fa7cd4c11064762eb0b12e3436/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
> > >>
> > >>
> >
> https://github.com/apache/cassandra/blob/3dcbe90e02440e6ee534f643c7603d50ca08482b/src/java/org/apache/cassandra/utils/ObjectSizes.java
> > >> https://github.com/jbellis/jamm
> > >> http://openjdk.java.net/projects/code-tools/jol/
> > >> https://github.com/DimitrisAndreou/memory-measurer
> > >>
> > >> OK, maybe that's more than what you wanted. :)
> > >>
> > >> Ismael
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>



-- 
Thanks,
Neha


Re: [DISCUSS] KIP-64 -Allow underlying distributed filesystem to take over replication depending on configuration

2016-06-19 Thread Gwen Shapira
Hi Arvind,

Thank you for proposing this KIP.

I am not sure how much experience you have in modifying Kafka's core
module, so I don't know if you are aware of how deeply the storage and
replication layer are integrated within Kafka. There is no clean API
to rip out, this KIP will essentially require a re-write of most of
Kafka. Obviously something that has huge risk (and huge amounts of
work).

For something that has so much risk and so much effort involved, I
feel that the justification in the KIP is lacking.

For instance, you say:
"Distributed data stores can be vastly improved by integrating with
Kafka. Some of these improvements are:
* They can participate easily in the whole Kafka ecosystem
* Data ingesting speeds can be improved"

Things that are not clear to me are:

1) Why should the Kafka community rewrite Kafka in order to improve
distributed data stores? Shouldn't the community for each data store
make the effort to improve their application? Where is the benefit to
Kafka users?

2) Can you detail in which ways are distributed data stores unable to
participate in Kafka ecosystem now? In which ways do they want to
participate?

3) Claiming that speeds can be improved is pretty easy :) Are you
talking about ingest to Kafka? or from Kafka to another store? What is
the current ingest rate? What is the current bottleneck? Where do you
expect the speed improvement to come from? Are you talking about
latency or throughput?

Once we all agree that there is indeed a problem, we can discuss your
proposed solution :)

Personally, I feel that Kafka is a distributed data store (with
log/queue semantics) and therefore cannot and should not delegate core
data store responsibilities to an external system. Kafka users came to
expect very strong reliability, consistency and durability guarantees
from Kafka and very clear replication semantics and we must be very
very careful not to compromise and put those at risk. Especially
without very clear benefits to Kafka users.

Thanks,

Gwen Shapira




On Sat, Jun 18, 2016 at 4:46 PM, Arvind Kandhare  wrote:
> Hi,
> Let's use this thread to discuss the above mentioned KIP.
>
> Here is the motivation for it:
> "Distributed data stores can be vastly improved by integrating with Kafka.
> Some of these improvements are:
>
>1. They can participate easily in the whole Kafka ecosystem
>2. Data ingesting speeds can be improved
>
> Distributed data stores come with their own replication. Kafka replication
> is a duplication of functionality for them.Kafka should defer replication
> to underlying file system if the configuration mandates it.
>
> In the newly added configuration a flush to the filesystem should consider
> a signal that the message is replicated."
>
> Do let me know your views on this.
>
>
> Thanks and regards,
>
> Arvind


Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-19 Thread Grant Henke
Apologies for the delay in response here.

It will take a bit of tracking inside the request object to track this
error and then handle it in KafkaApis, but it is possible. I am happy to
take that preferred approach. I will update the wiki & patch to handle this
scenario and re-initiate the vote tomorrow.

Thanks,
Grant

On Sun, Jun 19, 2016 at 8:59 PM, Ewen Cheslack-Postava 
wrote:

> I'm on the same page as Jun & Dana wrt disconnecting. Closing a connection
> should really be a last resort because we can no longer trust correct
> behavior in this session. In this case, we detect a bad request, but
> there's no reason to believe it will affect subsequent requests. There are
> dependencies to be sure, and if the client doesn't check errors, they may
> try to then write to topics that don't exist or something along those
> lines, but those requests can also be failed without killing the underlying
> TCP connection.
>
> -Ewen
>
> On Fri, Jun 17, 2016 at 1:46 PM, Jun Rao  wrote:
>
> > Grant,
> >
> > I think Dana has a valid point. Currently, we throw an
> > InvalidRequestException and close the connection only when the broker
> can't
> > deserialize the bytes into a request. In this case, the deserialization
> is
> > fine. It just that there are some additional constraints that can't be
> > specified at the protocol level. We can potentially just remember the
> > topics that violated those constraints in the request and handle them
> > accordingly with the right error code w/o disconnecting.
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Jun 17, 2016 at 8:40 AM, Dana Powers 
> > wrote:
> >
> > > I'm unconvinced (crazy, right?). Comments below:
> > >
> > > On Fri, Jun 17, 2016 at 7:27 AM, Grant Henke 
> > wrote:
> > > > Hi Dana,
> > > >
> > > > You mentioned one of the reasons I error and disconnect. Because I
> > can't
> > > > return an error for every request so the cardinality between request
> > and
> > > > response would be different. Beyond that though, I am handling this
> > > > protocol rule/parsing error the same way all other messages do.
> > >
> > > But you can return an error for every topic, and isn't that the level
> > > of error required here?
> > >
> > > > CreateTopic Response (Version: 0) => [topic_error_codes]
> > > >   topic_error_codes => topic error_code
> > > > topic => STRING
> > > > error_code => INT16
> > >
> > > If I submit duplicate requests for a topic, it's an error isolated to
> > > that topic. If I mess up the partition / replication / etc semantics
> > > for a topic, that's an error isolated to that topic. Is there a
> > > cardinality problem at this level?
> > >
> > >
> > > >
> > > > Parsing is handled in the RequestChannel and any exception that
> occurs
> > > > during this phase is caught, converted into an
> InvalidRequestException
> > > and
> > > > results in a disconnect:
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L92-L95
> > > >
> > > > Since this is an error that could only occur (and would always occur)
> > due
> > > > to incorrect client implementations, and not because of any cluster
> > state
> > > > or unusual situation, I felt this behavior was okay and made sense.
> For
> > > > client developers the broker logging should make it obvious what the
> > > issue
> > > > is. My patch also clearly documents the protocol rules in the
> Protocol
> > > > definition.
> > >
> > > Documentation is great and definitely a must. But requiring client
> > > developers to dig through server logs is not ideal. Client developers
> > > don't always have direct access to those logs. And even if they do,
> > > the brokers may have other traffic, which makes it difficult to track
> > > down the exact point in the logs where the error occurred.
> > >
> > > As discussed above, I don't think you need to or should model this as
> > > a request-level parsing error. It may be easier for the current broker
> > > implementation to do that and just crash the connection, but I don't
> > > think it makes that much sense from a raw api perspective.
> > >
> > > > In the future having a response header with an error code (and
> > optimally
> > > > error message) for every response would help solve this problem (for
> > all
> > > > message types).
> > >
> > > That will definitely help solve the more general invalid request error
> > > problem. But I think given the current state of error handling /
> > > feedback from brokers on request-level errors, you should treat
> > > connection crash as a last resort. I think there is a good opportunity
> > > to avoid it in this case, and I think the api would be better if done
> > > that way.
> > >
> > > -Dana
> > >
> > > > On Fri, Jun 17, 2016 at 12:04 AM, Dana Powers  >
> > > wrote:
> > > >
> > > >> Why disconnect the client on a InvalidRequestException? The 2 errors
> > > >> 

[jira] [Updated] (KAFKA-3868) New producer metric record-size-avg does not provide average record size as advertised

2016-06-19 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-3868:
---
Assignee: (was: Jun Rao)

> New producer metric record-size-avg does not provide average record size as 
> advertised
> --
>
> Key: KAFKA-3868
> URL: https://issues.apache.org/jira/browse/KAFKA-3868
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>
> The metrics for record size are setup as such:
> {code}
> this.maxRecordSizeSensor = metrics.sensor("record-size-max");
> m = metrics.metricName("record-size-max", metricGrpName, "The 
> maximum record size");
> this.maxRecordSizeSensor.add(m, new Max());
> m = metrics.metricName("record-size-avg", metricGrpName, "The 
> average record size");
> this.maxRecordSizeSensor.add(m, new Avg());
> {code}
> and then the values are recorded:
> {code}
> this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
> {code}
> So the value provided by record-size-avg is the average maximum record size 
> with the average computed by batch, not the average size of the records being 
> sent as is suggested by "The average record size". We have all the necessary 
> info needed to compute the metric, but it needs to be done separately from 
> recording the batch.maxRecordSize values.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3487) Support per-connector/per-task classloaders in Connect

2016-06-19 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15338920#comment-15338920
 ] 

Randall Hauch commented on KAFKA-3487:
--

[~ewencp] wrote:

{quote}
There's lots of other stuff you can do in this area too, e.g. URLClassLaoders 
that can just pull the jars from the network, or even more awesome would be a 
version that works from Central/Maven repos and can just auto-resolve all 
transitive dependencies in some variant of a URLClassLoader and allow all the 
connectors to be loaded completely dynamically (no more restarts to install new 
connectors!)
{quote}

Autoresolving classloaders work fine in many situations, but not all. Some 
environments do not have access to a Maven class loader, and it's not always 
repeatable/reliable. Yet not having to restart to install new connectors is 
still doable even with file-based JARs.

> Support per-connector/per-task classloaders in Connect
> --
>
> Key: KAFKA-3487
> URL: https://issues.apache.org/jira/browse/KAFKA-3487
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Liquan Pei
>Priority: Critical
>  Labels: needs-kip
>
> Currently we just use the default ClassLoader in Connect. However, this 
> limits how we can compatibly load conflicting connector plugins. Ideally we 
> would use a separate class loader per connector/task that is instantiated to 
> avoid potential conflicts.
> Note that this also opens up options for other ways to provide jars to 
> instantiate connectors. For example, Spark uses this to dynamically publish 
> classes defined in the REPL and load them via URL: 
> https://ardoris.wordpress.com/2014/03/30/how-spark-does-class-loading/ But 
> much simpler examples (include URL in the connector class instead of just 
> class name) are also possible and could be a nice way to more support dynamic 
> sets of connectors, multiple versions of the same connector, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3663) Proposal for a kafka broker command - kafka-brokers.sh

2016-06-19 Thread Jayesh Thakrar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jayesh Thakrar updated KAFKA-3663:
--
Description: 
This is a proposal for an admin tool - say, kafka-brokers.sh to provide broker 
related useful information. See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-59%3A+Proposal+for+a+kafka+broker+command
 for details.

The kafka-brokers.sh command mimics the kafka-topic.sh command, but provides 
details by broker rather than by topic.


  was:
This is a proposal for an admin tool - say, kafka-brokers.sh to provide broker 
related useful information. Note that I could not see an option to create a 
child page at 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals.

So here are the details for the proposals.

*Motivation*
Some of the key succcess factor for Kafka's success are its performant 
architecture and operational simplicity. This is further complemented with a 
set of commandline tools and utilities for managing topics as well as 
testing/stress-testing. However currently Kafka lacks commands/tools to get a 
cluster and broker overview. Although it should be mentioned that Kafka does 
expose cluster information via API and broker metrics via JMX.

*Proposed Change*
This KIP is for a command, say kafka-brokers.sh that provides useful cluster 
and broker information. 

The command will essentially provide the following pieces of information:

* Cluster Overview Information
** Controller Broker Id (and version/epoch information)
** Broker Count
** Total Topic Count
** Total Partition Count

* Broker Information
** Broker Id
** Rack Id
** Hostname
** Endpoints (protocol, port)
** JMX port
** Topic count
** All partition count
** Leader partition count
** Under-replicated partition count
** Topic partitions (Name, *partition-) - 
An asterisk would indicate that the broker is the leader for that partition.
A hyphen/negative sign would indicates that the partition is not in-sync.


As you can see, the above information provides a view of the cluster and 
brokers that complements kafka-topics.sh.

This command can be further evolved to do more things like:
- Drain one or more brokers for decommissioning. This feature would allow 
distributing off all partitions of a list of brokers to other brokers in the 
cluster (need to be cognizant of rack configuration). 
- Onboard one or more new brokers. This feature would do the reverse of 
removing brokers and allow controlled distribution of partition to the new 
brokers.
- Obtain JMX metrics from a broker (see KIP for its usage).
- See KAFKA-3649 for other features



> Proposal for a kafka broker command - kafka-brokers.sh
> --
>
> Key: KAFKA-3663
> URL: https://issues.apache.org/jira/browse/KAFKA-3663
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Jayesh Thakrar
>
> This is a proposal for an admin tool - say, kafka-brokers.sh to provide 
> broker related useful information. See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-59%3A+Proposal+for+a+kafka+broker+command
>  for details.
> The kafka-brokers.sh command mimics the kafka-topic.sh command, but provides 
> details by broker rather than by topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-19 Thread Ewen Cheslack-Postava
+1 on the KIP.

On Sat, Jun 18, 2016 at 11:52 AM, Ismael Juma  wrote:

> If we do that, shouldn't `max.poll.records` remain with the current default
> of `Integer.MAX_VALUE`?
>
> On Sat, Jun 18, 2016 at 5:18 PM, Jay Kreps  wrote:
>
> > +1
> >
> > One small but important thing I think we should consider changing: I
> think
> > we should consider setting the default for max.poll.interval to infinite.
> > Previously our definition of alive was "polls within session timeout".
> Now
> > our definition of alive is "pings from b/g thread w/in session timeout".
> > The reality is that anything we set here as a default is going to be too
> > low for some people and those people will be confused. Previously that
> was
> > okay because the definition of liveness required you to understand and
> > think about this setting, so getting an error meant you needed to think
> > harder. But now this is really an optional setting for people who care to
> > enforce some bound, so introducing a possible failure/instability mode by
> > guess that bound seems wrong. Instead I think users that know and care
> > about this should set a thoughtful max.poll.interval, but we should not
> try
> > to guess for those who don't.
> >
> > This seems minor but I think we've found that defaults matter more than
> > configs so it's worth being thoughtful.
>

This is very, very not minor. This is a really important default and I
strongly disagree that setting it to infinite is a good idea. Liveness
isn't something you can ignore. People like to ignore it because it makes
writing code easier, but that just means they write broken code. We can't
avoid giving them the rope to hang themselves (they can always override the
setting to a very large value), but we shouldn't encourage them to do it.

Looking at various connectors (beyond just Kafka itself, as I was looking
at other frameworks), there was quite a bit of code structured roughly as:

while(true) {
  try {
conn = open(url);
records = consumer.poll();
for (ConsumerRecord record : records) {
  sendRecord(conn, record);
}
  } catch (ConnectionException e) {
//ignore and retry
  }
}

In other words, code which will never make progress under a configuration
or deployment error. Handling errors by ignoring them isn't rare, which
means you'll probably get no real indication of liveness unless you make it
explicit (i.e. you *must* call something periodically). Of course, users
can always continue to screw themselves over by also ignoring errors *we*
produce and still keeping their app alive, but at that point we've done all
we can and at least we'll have attempted to shift work to another instance.

I think setting a *larger* default timeout could make sense -- there's no
reasonable way to set a default since the message size, number of messages,
and message processing time will all vary widely by application. But really
the important distinction is (reasonable) finite timeout vs infinite. We
shouldn't default to something that never lets you figure out that
something is wrong. (If we could come up with a finite maximum value, I
would absolutely want to add that as a strict maximum, but there's no such
value.) If someone exceeds what we choose as a default maximum, it is
*really* in their interest to understand why they need such a large timeout
and whether there are better solutions to their problem.

I'm sure it comes across as condescending, but we actually do know better
than many application developers. There are implications of just dropping
these timeouts that don't end well for the app. Ignoring those error cases
and liveness issues works fine 99% of the time, but it's important if you
really care about resiliency of your app and when things break they'll ask
why we set a default value that leads to such bad results. I staunchly
believe that it's better to explain why there is complexity and how to deal
with it than to try to hide it when it can't really be hidden.

-Ewen

P.S. I would invoke checked exceptions as another case where framework devs
try to encourage app developers to avoid hanging themselves yet app devs
are given enough rope and end up hanging themselves, but the Kafka codebase
eschews checked exceptions, so



> >
> > -Jay
> >
> > On Thu, Jun 16, 2016 at 11:44 AM, Jason Gustafson 
> > wrote:
> >
> > > Hi All,
> > >
> > > I'd like to open the vote for KIP-62. This proposal attempts to address
> > one
> > > of the recurring usability problems that users of the new consumer have
> > > faced with as little impact as possible. You can read the full details
> > > here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> > > .
> > >
> > > After some discussion on this list, I think we were in agreement that
> > this
> > > change addresses a major part of the problem and we've left the door
> open
> > > for 

Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-19 Thread Ewen Cheslack-Postava
I'm on the same page as Jun & Dana wrt disconnecting. Closing a connection
should really be a last resort because we can no longer trust correct
behavior in this session. In this case, we detect a bad request, but
there's no reason to believe it will affect subsequent requests. There are
dependencies to be sure, and if the client doesn't check errors, they may
try to then write to topics that don't exist or something along those
lines, but those requests can also be failed without killing the underlying
TCP connection.

-Ewen

On Fri, Jun 17, 2016 at 1:46 PM, Jun Rao  wrote:

> Grant,
>
> I think Dana has a valid point. Currently, we throw an
> InvalidRequestException and close the connection only when the broker can't
> deserialize the bytes into a request. In this case, the deserialization is
> fine. It just that there are some additional constraints that can't be
> specified at the protocol level. We can potentially just remember the
> topics that violated those constraints in the request and handle them
> accordingly with the right error code w/o disconnecting.
>
> Thanks,
>
> Jun
>
> On Fri, Jun 17, 2016 at 8:40 AM, Dana Powers 
> wrote:
>
> > I'm unconvinced (crazy, right?). Comments below:
> >
> > On Fri, Jun 17, 2016 at 7:27 AM, Grant Henke 
> wrote:
> > > Hi Dana,
> > >
> > > You mentioned one of the reasons I error and disconnect. Because I
> can't
> > > return an error for every request so the cardinality between request
> and
> > > response would be different. Beyond that though, I am handling this
> > > protocol rule/parsing error the same way all other messages do.
> >
> > But you can return an error for every topic, and isn't that the level
> > of error required here?
> >
> > > CreateTopic Response (Version: 0) => [topic_error_codes]
> > >   topic_error_codes => topic error_code
> > > topic => STRING
> > > error_code => INT16
> >
> > If I submit duplicate requests for a topic, it's an error isolated to
> > that topic. If I mess up the partition / replication / etc semantics
> > for a topic, that's an error isolated to that topic. Is there a
> > cardinality problem at this level?
> >
> >
> > >
> > > Parsing is handled in the RequestChannel and any exception that occurs
> > > during this phase is caught, converted into an InvalidRequestException
> > and
> > > results in a disconnect:
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L92-L95
> > >
> > > Since this is an error that could only occur (and would always occur)
> due
> > > to incorrect client implementations, and not because of any cluster
> state
> > > or unusual situation, I felt this behavior was okay and made sense. For
> > > client developers the broker logging should make it obvious what the
> > issue
> > > is. My patch also clearly documents the protocol rules in the Protocol
> > > definition.
> >
> > Documentation is great and definitely a must. But requiring client
> > developers to dig through server logs is not ideal. Client developers
> > don't always have direct access to those logs. And even if they do,
> > the brokers may have other traffic, which makes it difficult to track
> > down the exact point in the logs where the error occurred.
> >
> > As discussed above, I don't think you need to or should model this as
> > a request-level parsing error. It may be easier for the current broker
> > implementation to do that and just crash the connection, but I don't
> > think it makes that much sense from a raw api perspective.
> >
> > > In the future having a response header with an error code (and
> optimally
> > > error message) for every response would help solve this problem (for
> all
> > > message types).
> >
> > That will definitely help solve the more general invalid request error
> > problem. But I think given the current state of error handling /
> > feedback from brokers on request-level errors, you should treat
> > connection crash as a last resort. I think there is a good opportunity
> > to avoid it in this case, and I think the api would be better if done
> > that way.
> >
> > -Dana
> >
> > > On Fri, Jun 17, 2016 at 12:04 AM, Dana Powers 
> > wrote:
> > >
> > >> Why disconnect the client on a InvalidRequestException? The 2 errors
> > >> you are catching are both topic-level: (1) multiple requests for the
> > >> same topic, and (2) ReplicaAssignment and num_partitions /
> > >> replication_factor both set. Wouldn't it be better to just error the
> > >> offending create_topic_request, not the entire connection? The
> > >> CreateTopicsResponse returns a map of topics to error codes. You could
> > >> just return the topic that caused the error and an
> > >> InvalidRequestException error code.
> > >>
> > >> -Dana
> > >>
> > >> On Thu, Jun 16, 2016 at 8:37 AM, Grant Henke 
> > wrote:
> > >> > I have updated the wiki and pull request based on the 

Re: [VOTE] KIP-4 Create Topics Schema

2016-06-19 Thread Ewen Cheslack-Postava
Don't necessarily want to add noise here, but I'm -1 based on the
disconnect part. See discussion in other thread. (I'm +1 otherwise, and
happy to have my vote applied assuming we clean up that one issue.)

-Ewen

On Thu, Jun 16, 2016 at 6:05 PM, Harsha  wrote:

> +1 (binding)
> Thanks,
> Harsha
>
> On Thu, Jun 16, 2016, at 04:15 PM, Guozhang Wang wrote:
> > +1.
> >
> > On Thu, Jun 16, 2016 at 3:47 PM, Ismael Juma  wrote:
> >
> > > +1 (binding)
> > >
> > > On Thu, Jun 16, 2016 at 11:50 PM, Grant Henke 
> wrote:
> > >
> > > > I would like to initiate the voting process for the "KIP-4 Create
> Topics
> > > > Schema changes". This is not a vote for all of KIP-4, but
> specifically
> > > for
> > > > the create topics changes. I have included the exact changes below
> for
> > > > clarity:
> > > > >
> > > > > Create Topics Request (KAFKA-2945
> > > > > )
> > > > >
> > > > > CreateTopics Request (Version: 0) => [create_topic_requests]
> timeout
> > > > >   create_topic_requests => topic num_partitions replication_factor
> > > > [replica_assignment] [configs]
> > > > > topic => STRING
> > > > > num_partitions => INT32
> > > > > replication_factor => INT16
> > > > > replica_assignment => partition_id [replicas]
> > > > >   partition_id => INT32
> > > > >   replicas => INT32
> > > > > configs => config_key config_value
> > > > >   config_key => STRING
> > > > >   config_value => STRING
> > > > >   timeout => INT32
> > > > >
> > > > > CreateTopicsRequest is a batch request to initiate topic creation
> with
> > > > > either predefined or automatic replica assignment and optionally
> topic
> > > > > configuration.
> > > > >
> > > > > Request semantics:
> > > > >
> > > > >1. Must be sent to the controller broker
> > > > >2. If there are multiple instructions for the same topic in one
> > > > >request an InvalidRequestException will be logged on the broker
> and
> > > > the
> > > > >client will be disconnected.
> > > > >   - This is because the list of topics is modeled server side
> as a
> > > > >   map with TopicName as the key
> > > > >3. The principal must be authorized to the "Create" Operation
> on the
> > > > >"Cluster" resource to create topics.
> > > > >   - Unauthorized requests will receive a
> > > > ClusterAuthorizationException
> > > > >4.
> > > > >
> > > > >Only one from ReplicaAssignment or (num_partitions +
> > > > replication_factor
> > > > >), can be defined in one instruction.
> > > > >- If both parameters are specified an InvalidRequestException
> will
> > > be
> > > > >   logged on the broker and the client will be disconnected.
> > > > >   - In the case ReplicaAssignment is defined number of
> partitions
> > > and
> > > > >   replicas will be calculated from the supplied
> replica_assignment.
> > > > >   - In the case of defined (num_partitions +
> replication_factor)
> > > > >   replica assignment will be automatically generated by the
> server.
> > > > >   - One or the other must be defined. The existing broker side
> auto
> > > > >   create defaults will not be used
> > > > >   (default.replication.factor, num.partitions). The client
> > > > implementation can
> > > > >   have defaults for these options when generating the messages.
> > > > >   - The first replica in [replicas] is assumed to be the
> preferred
> > > > >   leader. This matches current behavior elsewhere.
> > > > >5. Setting a timeout > 0 will allow the request to block until
> the
> > > > >topic metadata is "complete" on the controller node.
> > > > >   - Complete means the local topic metadata cache been
> completely
> > > > >   populated and all partitions have leaders
> > > > >  - The topic metadata is updated when the controller sends
> out
> > > > >  update metadata requests to the brokers
> > > > >   - If a timeout error occurs, the topic could still be created
> > > > >   successfully at a later time. Its up to the client to query
> for
> > > > the state
> > > > >   at that point.
> > > > >6. Setting a timeout <= 0 will validate arguments and trigger
> the
> > > > >create topics and return immediately.
> > > > >   - This is essentially the fully asynchronous mode we have in
> the
> > > > >   Zookeeper tools today.
> > > > >   - The error code in the response will either contain an
> argument
> > > > >   validation exception or a timeout exception. If you receive a
> > > > timeout
> > > > >   exception, because you asked for 0 timeout, you can assume
> the
> > > > message was
> > > > >   valid and the topic creation was triggered.
> > > > >7. The request is not transactional.
> > > > >   1. If an error occurs on one topic, the others could still be
> > > > >   created.
> > > > >   2. Errors are 

Jenkins build is back to normal : kafka-0.10.0-jdk7 #130

2016-06-19 Thread Apache Jenkins Server
See 



Build failed in Jenkins: kafka-trunk-jdk8 #709

2016-06-19 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-3850: WorkerSinkTask commit prior to rebalance should be retried

[me] MINOR: Mention `log.message.format.version=0.10.0` in rolling upgrade

--
[...truncated 8122 lines...]

org.apache.kafka.common.metrics.JmxReporterTest > testJmxRegistration PASSED

org.apache.kafka.common.metrics.stats.HistogramTest > testHistogram STARTED

org.apache.kafka.common.metrics.stats.HistogramTest > testHistogram PASSED

org.apache.kafka.common.metrics.stats.HistogramTest > testConstantBinScheme 
STARTED

org.apache.kafka.common.metrics.stats.HistogramTest > testConstantBinScheme 
PASSED

org.apache.kafka.common.metrics.stats.HistogramTest > testLinearBinScheme 
STARTED

org.apache.kafka.common.metrics.stats.HistogramTest > testLinearBinScheme PASSED

org.apache.kafka.common.utils.CrcTest > testUpdateInt STARTED

org.apache.kafka.common.utils.CrcTest > testUpdateInt PASSED

org.apache.kafka.common.utils.CrcTest > testUpdate STARTED

org.apache.kafka.common.utils.CrcTest > testUpdate PASSED

org.apache.kafka.common.utils.UtilsTest > testAbs STARTED

org.apache.kafka.common.utils.UtilsTest > testAbs PASSED

org.apache.kafka.common.utils.UtilsTest > testMin STARTED

org.apache.kafka.common.utils.UtilsTest > testMin PASSED

org.apache.kafka.common.utils.UtilsTest > testJoin STARTED

org.apache.kafka.common.utils.UtilsTest > testJoin PASSED

org.apache.kafka.common.utils.UtilsTest > testReadBytes STARTED

org.apache.kafka.common.utils.UtilsTest > testReadBytes PASSED

org.apache.kafka.common.utils.UtilsTest > testGetHost STARTED

org.apache.kafka.common.utils.UtilsTest > testGetHost PASSED

org.apache.kafka.common.utils.UtilsTest > testGetPort STARTED

org.apache.kafka.common.utils.UtilsTest > testGetPort PASSED

org.apache.kafka.common.utils.UtilsTest > testCloseAll STARTED

org.apache.kafka.common.utils.UtilsTest > testCloseAll PASSED

org.apache.kafka.common.utils.UtilsTest > testFormatAddress STARTED

org.apache.kafka.common.utils.UtilsTest > testFormatAddress PASSED

org.apache.kafka.common.utils.AbstractIteratorTest > testEmptyIterator STARTED

org.apache.kafka.common.utils.AbstractIteratorTest > testEmptyIterator PASSED

org.apache.kafka.common.utils.AbstractIteratorTest > testIterator STARTED

org.apache.kafka.common.utils.AbstractIteratorTest > testIterator PASSED

org.apache.kafka.common.ClusterTest > testBootstrap STARTED

org.apache.kafka.common.ClusterTest > testBootstrap PASSED

org.apache.kafka.common.cache.LRUCacheTest > testEviction STARTED

org.apache.kafka.common.cache.LRUCacheTest > testEviction PASSED

org.apache.kafka.common.cache.LRUCacheTest > testPutGet STARTED

org.apache.kafka.common.cache.LRUCacheTest > testPutGet PASSED

org.apache.kafka.common.cache.LRUCacheTest > testRemove STARTED

org.apache.kafka.common.cache.LRUCacheTest > testRemove PASSED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationRequestedValidProvided STARTED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationRequestedValidProvided PASSED

org.apache.kafka.common.network.SslTransportLayerTest > testUnsupportedCiphers 
STARTED

org.apache.kafka.common.network.SslTransportLayerTest > testUnsupportedCiphers 
PASSED

org.apache.kafka.common.network.SslTransportLayerTest > 
testUnsupportedTLSVersion STARTED

org.apache.kafka.common.network.SslTransportLayerTest > 
testUnsupportedTLSVersion PASSED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationRequiredNotProvided STARTED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationRequiredNotProvided PASSED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationRequestedNotProvided STARTED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationRequestedNotProvided PASSED

org.apache.kafka.common.network.SslTransportLayerTest > 
testInvalidKeystorePassword STARTED

org.apache.kafka.common.network.SslTransportLayerTest > 
testInvalidKeystorePassword PASSED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationDisabledNotProvided STARTED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationDisabledNotProvided PASSED

org.apache.kafka.common.network.SslTransportLayerTest > 
testInvalidEndpointIdentification STARTED

org.apache.kafka.common.network.SslTransportLayerTest > 
testInvalidEndpointIdentification PASSED

org.apache.kafka.common.network.SslTransportLayerTest > 
testEndpointIdentificationDisabled STARTED

org.apache.kafka.common.network.SslTransportLayerTest > 
testEndpointIdentificationDisabled PASSED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationRequiredUntrustedProvided STARTED

org.apache.kafka.common.network.SslTransportLayerTest > 
testClientAuthenticationRequiredUntrustedProvided PASSED


[jira] [Commented] (KAFKA-3487) Support per-connector/per-task classloaders in Connect

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15338861#comment-15338861
 ] 

Ewen Cheslack-Postava commented on KAFKA-3487:
--

[~rhauch] Definitely agree that we want to come up with a forward-looking 
solution. We still need to support JDK7 (although I think some discussion has 
been kicked off around moving to baseline of JDK8), so we definitely need to 
make sure we come up with a compatible solution for JDK8 and JDK9, but if we 
can set things up to be cleaner in the future, that'd be ideal. The basics 
(classloader per directory) should definitely be easy to make work. There's 
lots of other stuff you can do in this area too, e.g. URLClassLaoders that can 
just pull the jars from the network, or even more awesome would be a version 
that works from Central/Maven repos and can just auto-resolve all transitive 
dependencies in some variant of a URLClassLoader and allow all the connectors 
to be loaded completely dynamically (no more restarts to install new 
connectors!).

> Support per-connector/per-task classloaders in Connect
> --
>
> Key: KAFKA-3487
> URL: https://issues.apache.org/jira/browse/KAFKA-3487
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Liquan Pei
>Priority: Critical
>  Labels: needs-kip
>
> Currently we just use the default ClassLoader in Connect. However, this 
> limits how we can compatibly load conflicting connector plugins. Ideally we 
> would use a separate class loader per connector/task that is instantiated to 
> avoid potential conflicts.
> Note that this also opens up options for other ways to provide jars to 
> instantiate connectors. For example, Spark uses this to dynamically publish 
> classes defined in the REPL and load them via URL: 
> https://ardoris.wordpress.com/2014/03/30/how-spark-does-class-loading/ But 
> much simpler examples (include URL in the connector class instead of just 
> class name) are also possible and could be a nice way to more support dynamic 
> sets of connectors, multiple versions of the same connector, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3868) New producer metric record-size-avg does not provide average record size as advertised

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-3868:


 Summary: New producer metric record-size-avg does not provide 
average record size as advertised
 Key: KAFKA-3868
 URL: https://issues.apache.org/jira/browse/KAFKA-3868
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.10.0.0
Reporter: Ewen Cheslack-Postava
Assignee: Jun Rao


The metrics for record size are setup as such:

{code}
this.maxRecordSizeSensor = metrics.sensor("record-size-max");
m = metrics.metricName("record-size-max", metricGrpName, "The 
maximum record size");
this.maxRecordSizeSensor.add(m, new Max());
m = metrics.metricName("record-size-avg", metricGrpName, "The 
average record size");
this.maxRecordSizeSensor.add(m, new Avg());
{code}

and then the values are recorded:

{code}
this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
{code}

So the value provided by record-size-avg is the average maximum record size 
with the average computed by batch, not the average size of the records being 
sent as is suggested by "The average record size". We have all the necessary 
info needed to compute the metric, but it needs to be done separately from 
recording the batch.maxRecordSize values.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : kafka-trunk-jdk7 #1377

2016-06-19 Thread Apache Jenkins Server
See 



[GitHub] kafka pull request #1518: KAFKA-3632; remove fetcher metrics on shutdown and...

2016-06-19 Thread hachikuji
Github user hachikuji closed the pull request at:

https://github.com/apache/kafka/pull/1518


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Operator order

2016-06-19 Thread Guozhang Wang
Hello Jeyhun,

Another way to do this "dynamic routing" is to specify your topology using
the lower-level processor API:

http://docs.confluent.io/3.0.0/streams/developer-guide.html#processor-api

More specifically, you can for example specify both A and D as parents of E
when adding processor E, and then in the processor A you can use the "
forward(K key, V value, String childName)" to pass the record to a specific
child (either B or E) by its processor name.


As for TelegraphCQ and its underlying query processor (i.e. the Eddy model
http://db.cs.berkeley.edu/papers/sigmod00-eddy.pdf), my understanding is
that it is conceptually any-to-any routable and the query processor will
try to schedule at a per-record granularity depending on the query
selectivity, etc. But this is not fully controllable by the users. Is that
correct?


Guozhang


On Sun, Jun 19, 2016 at 7:16 AM, Matthias J. Sax 
wrote:

> Thanks for clarification. Still don't have an better answer as before.
>
> How much overhead my suggestion gives is hard to predict. However, the
> filter operators will run in the same thread (it's more or less just
> another chained method call), thus, it should not be too large.
> Furthermore, it should never the required to write tagged record to
> Kafka -- thus, it would only be some main memory overhead. But you would
> need to test and measure.
>
> -Matthias
>
> On 06/18/2016 08:13 PM, Jeyhun Karimov wrote:
> > Hi Matthias,
> >
> > Thank you for your answer. In my use-case, depending on statistics of
> every
> > operator, some tuples can be escaped for specific operators, so that we
> can
> > get approximate but faster result. I think this is somehow similar to
> >  TelegraphCQ in dynamism of operators.
> > In my case, the goal is getting rid of transmission and processing
> overhead
> > of some tuples for some operators (in runtime) to get approximate
> results.
> > However, it iseems the possible solution can bring extra overhead to
> system
> > in some cases.
> >
> > Jeyhun
> >
> > On Sat, Jun 18, 2016 at 7:36 PM Matthias J. Sax 
> > wrote:
> >
> >> Hi Jeyhun,
> >>
> >> there is no support by the library itself. But you could build a custom
> >> solution by building the DAG with all required edges (ie, additional
> >> edges from A->E, and B->sink etc.). For this, each output message from A
> >> would be duplicate and send to B and E. Therefore, A should "tag" each
> >> message with the designated receiver (B or E) and you add additional
> >> filter step in both edges (ie, a filter between A->F1->B and A->F2->E),
> >> that drop messages if the "tag" does not match the downstream operator.
> >>
> >> Does this makes sense? Of course, depending on your use case, you might
> >> get a huge number of edges (plus filters) and your DAG might be quite
> >> complex. Don't see any other solution though.
> >>
> >> Hope this helps.
> >>
> >> One question though: how would changing the DAG at runtime would help
> >> you? Do you mean you would dynamically change the edge between A->B and
> >> A->sink ? I guess, this would be a very special pattern and I doubt that
> >> any library or system can offer this.
> >>
> >> -Matthias
> >>
> >> On 06/18/2016 05:33 PM, Jeyhun Karimov wrote:
> >>> Hi community,
> >>>
> >>> Is there a way in Kafka Streams to change the order of operators in
> >>> runtime? For example, I have operators
> >>>
> >>> Source->A->B->C->D->E->Sink
> >>>
> >>> and I want to forward some tuples from A to E, from B to Sink and etc.
> As
> >>> far as I know, the stream execution graph is computed in compile time
> and
> >>> does not change in runtime. Can there be an indirect solution for this
> >>> specific case?
> >>>
> >>> Jeyhun
> >>>
> >>
> >> --
> > -Cheers
> >
> > Jeyhun
> >
>
>


-- 
-- Guozhang


[jira] [Assigned] (KAFKA-3825) Allow users to specify different types of state stores in Streams DSL

2016-06-19 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-3825:
-

Assignee: Jeyhun Karimov

> Allow users to specify different types of state stores in Streams DSL
> -
>
> Key: KAFKA-3825
> URL: https://issues.apache.org/jira/browse/KAFKA-3825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: api
>
> Today the high-level Streams DSL uses hard-coded types of state stores (i.e. 
> persistent RocksDB) for their stateful operations. But for advanced users 
> they should be able to specify different types of state stores (in-memory, 
> persistent, customized) also in the DSL, instead of resorting to the 
> lower-level APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1514: MINOR: Mention `log.message.format.version=0.10.0`...

2016-06-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1514


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-3850) WorkerSinkTask should retry commits if woken up during rebalance or shutdown

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-3850:
-
   Resolution: Fixed
Fix Version/s: 0.10.1.0
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 1511
[https://github.com/apache/kafka/pull/1511]

> WorkerSinkTask should retry commits if woken up during rebalance or shutdown
> 
>
> Key: KAFKA-3850
> URL: https://issues.apache.org/jira/browse/KAFKA-3850
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.10.1.0, 0.10.0.1
>
>
> We use consumer.wakeup() to interrupt long polls when we need to pause/resume 
> partitions and when we shutdown sink tasks. The resulting {{WakeupException}} 
> could be raised from the synchronous commit which we use in between 
> rebalances and on shutdown. Since we don't currently catch this exception, we 
> can fail to commit offsets, which typically results in duplicates. To fix 
> this problem, we should catch the exception, retry the commit, and then 
> rethrow it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3850) WorkerSinkTask should retry commits if woken up during rebalance or shutdown

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15338822#comment-15338822
 ] 

ASF GitHub Bot commented on KAFKA-3850:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1511


> WorkerSinkTask should retry commits if woken up during rebalance or shutdown
> 
>
> Key: KAFKA-3850
> URL: https://issues.apache.org/jira/browse/KAFKA-3850
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.10.0.1
>
>
> We use consumer.wakeup() to interrupt long polls when we need to pause/resume 
> partitions and when we shutdown sink tasks. The resulting {{WakeupException}} 
> could be raised from the synchronous commit which we use in between 
> rebalances and on shutdown. Since we don't currently catch this exception, we 
> can fail to commit offsets, which typically results in duplicates. To fix 
> this problem, we should catch the exception, retry the commit, and then 
> rethrow it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1511: KAFKA-3850: WorkerSinkTask commit prior to rebalan...

2016-06-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1511


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3861) Shrunk ISR before leader crash makes the partition unavailable

2016-06-19 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15338804#comment-15338804
 ] 

Maysam Yabandeh commented on KAFKA-3861:


Thanks [~wushujames]. I left a comment on KAFKA-3410.

> Shrunk ISR before leader crash makes the partition unavailable
> --
>
> Key: KAFKA-3861
> URL: https://issues.apache.org/jira/browse/KAFKA-3861
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>
> We observed a case that the leader experienced a crash and lost its in-memory 
> data and latest HW offsets. Normally Kafka should be safe and be able to make 
> progress with a single node failure. However a few seconds before the crash 
> the leader shrunk its ISR to itself, which is safe since min-in-sync-replicas 
> is 2 and replication factor is 3 thus the troubled leader cannot accept new 
> produce messages. After the crash however the controller could not name any 
> of the of the followers as the new leader since as far as the controller 
> knows they are not in ISR and could potentially be behind the last leader. 
> Note that unclean-leader-election is disabled in this cluster since the 
> cluster requires a very high degree of durability and cannot tolerate data 
> loss.
> The impact could get worse if the admin brings up the crashed broker in an 
> attempt to make such partitions available again; this would take down even 
> more brokers as the followers panic when they find their offset larger than 
> HW offset in the leader:
> {code}
> if (leaderEndOffset < replica.logEndOffset.messageOffset) {
>   // Prior to truncating the follower's log, ensure that doing so is not 
> disallowed by the configuration for unclean leader election.
>   // This situation could only happen if the unclean election 
> configuration for a topic changes while a replica is down. Otherwise,
>   // we should never encounter this situation since a non-ISR leader 
> cannot be elected if disallowed by the broker configuration.
>   if (!LogConfig.fromProps(brokerConfig.originals, 
> AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
> ConfigType.Topic, 
> topicAndPartition.topic)).uncleanLeaderElectionEnable) {
> // Log a fatal error and shutdown the broker to ensure that data loss 
> does not unexpectedly occur.
> fatal("Halting because log truncation is not allowed for topic 
> %s,".format(topicAndPartition.topic) +
>   " Current leader %d's latest offset %d is less than replica %d's 
> latest offset %d"
>   .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, 
> replica.logEndOffset.messageOffset))
> Runtime.getRuntime.halt(1)
>   }
> {code}
> One hackish solution would be that the admin investigates the logs, determine 
> that unclean-leader-election in this particular case would be safe and 
> temporarily enables it (while the crashed node is down) until new leaders are 
> selected for affected partitions, wait for the topics LEO advances far enough 
> and then brings up the crashed node again. This manual process is however 
> slow and error-prone and the cluster will suffer partial unavailability in 
> the meanwhile.
> We are thinking of having the controller make an exception for this case: if 
> ISR size is less than min-in-sync-replicas and the new leader would be -1, 
> then the controller does an RPC to all the replicas and inquire of the latest 
> offset, and if all the replicas responded then chose the one with the largest 
> offset as the leader as well as the new ISR. Note that the controller cannot 
> do that if any of the non-leader replicas do not respond since there might be 
> a case that the responding replicas have not been involved the last ISR and 
> hence potentially behind the others (and the controller could not know that 
> since it does not keep track of previous ISR).
> Pros would be that kafka will be safely available when such cases occur and 
> would not require any admin intervention. The cons however is that the 
> controller talking to brokers inside the leader election function would break 
> the existing pattern in the source code as currently the leader is elected 
> locally without requiring any additional RPC.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #1376

2016-06-19 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-3810: replication of internal topics should not be limited by

--
[...truncated 12746 lines...]

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment1 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testJoinLeaderCannotAssign STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testJoinLeaderCannotAssign PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testRejoinGroup STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testRejoinGroup PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupLeader STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupLeader PASSED

org.apache.kafka.connect.runtime.WorkerTaskTest > stopBeforeStarting STARTED

org.apache.kafka.connect.runtime.WorkerTaskTest > stopBeforeStarting PASSED

org.apache.kafka.connect.runtime.WorkerTaskTest > standardStartup STARTED

org.apache.kafka.connect.runtime.WorkerTaskTest > standardStartup PASSED

org.apache.kafka.connect.runtime.WorkerTaskTest > cancelBeforeStopping STARTED

org.apache.kafka.connect.runtime.WorkerTaskTest > cancelBeforeStopping PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > testStartPaused STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > testStartPaused PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > testPause STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > testPause PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > testPollRedelivery STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > testPollRedelivery PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > 
testErrorInRebalancePartitionRevocation STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > 
testErrorInRebalancePartitionRevocation PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > 
testErrorInRebalancePartitionAssignment STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskTest > 
testErrorInRebalancePartitionAssignment PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testPollsInBackground STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testPollsInBackground PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommit STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommit PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitTaskFlushFailure STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitTaskFlushFailure PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitTaskSuccessAndFlushFailure STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitTaskSuccessAndFlushFailure PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitConsumerFailure STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitConsumerFailure PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitTimeout 
STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitTimeout 
PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testAssignmentPauseResume STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testAssignmentPauseResume PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testRewind STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testRewind PASSED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testStartPaused STARTED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testStartPaused PASSED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testPause STARTED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testPause PASSED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testPollsInBackground 
STARTED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testPollsInBackground 
PASSED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testCommit STARTED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testCommit PASSED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testFailureInPoll 
STARTED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testFailureInPoll PASSED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testCommitFailure 
STARTED

org.apache.kafka.connect.runtime.WorkerSourceTaskTest > testCommitFailure PASSED


Build failed in Jenkins: kafka-trunk-jdk8 #708

2016-06-19 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-3810: replication of internal topics should not be limited by

--
[...truncated 10574 lines...]

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
unsubscription PASSED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
patternSubscription STARTED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
patternSubscription PASSED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
cantSubscribeTopicAndPattern STARTED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
cantSubscribeTopicAndPattern PASSED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
invalidPositionUpdate STARTED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
invalidPositionUpdate PASSED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
unsubscribeUserSubscribe STARTED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
unsubscribeUserSubscribe PASSED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
cantChangePositionForNonAssignedPartition STARTED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
cantChangePositionForNonAssignedPartition PASSED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
commitOffsetMetadata STARTED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
commitOffsetMetadata PASSED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
cantSubscribePatternAndTopic STARTED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
cantSubscribePatternAndTopic PASSED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
cantSubscribePatternAndPartition STARTED

org.apache.kafka.clients.consumer.internals.SubscriptionStateTest > 
cantSubscribePatternAndPartition PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > testConstructorClose 
STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > testConstructorClose 
PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > testPause STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > testPause PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testInvalidSocketSendBufferSize STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testInvalidSocketSendBufferSize PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testOsDefaultSocketBufferSizes STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testOsDefaultSocketBufferSizes PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > testSeekNegative STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > testSeekNegative PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testInvalidSocketReceiveBufferSize STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testInvalidSocketReceiveBufferSize PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testAutoCommitSentBeforePositionUpdate STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testAutoCommitSentBeforePositionUpdate PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > verifyHeartbeatSent 
STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > verifyHeartbeatSent PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
verifyHeartbeatSentWhenFetchedDataReady STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
verifyHeartbeatSentWhenFetchedDataReady PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > testSubscription STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > testSubscription PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testInterceptorConstructorClose STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testInterceptorConstructorClose PASSED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testWakeupWithFetchDataAvailable STARTED

org.apache.kafka.clients.consumer.KafkaConsumerTest > 
testWakeupWithFetchDataAvailable PASSED

org.apache.kafka.clients.consumer.ConsumerConfigTest > 
testDeserializerToPropertyConfig STARTED

org.apache.kafka.clients.consumer.ConsumerConfigTest > 
testDeserializerToPropertyConfig PASSED

org.apache.kafka.clients.consumer.ConsumerConfigTest > 
testDeserializerToMapConfig STARTED

org.apache.kafka.clients.consumer.ConsumerConfigTest > 
testDeserializerToMapConfig PASSED

org.apache.kafka.clients.consumer.RoundRobinAssignorTest > 
testOneConsumerNoTopic STARTED

org.apache.kafka.clients.consumer.RoundRobinAssignorTest > 
testOneConsumerNoTopic PASSED

org.apache.kafka.clients.consumer.RoundRobinAssignorTest > 
testTwoConsumersTwoTopicsSixPartitions STARTED

org.apache.kafka.clients.consumer.RoundRobinAssignorTest > 
testTwoConsumersTwoTopicsSixPartitions PASSED


[jira] [Resolved] (KAFKA-3810) replication of internal topics should not be limited by replica.fetch.max.bytes

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-3810.
--
   Resolution: Fixed
Fix Version/s: 0.10.1.0

Issue resolved by pull request 1484
[https://github.com/apache/kafka/pull/1484]

> replication of internal topics should not be limited by 
> replica.fetch.max.bytes
> ---
>
> Key: KAFKA-3810
> URL: https://issues.apache.org/jira/browse/KAFKA-3810
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.10.1.0
>
>
> From the kafka-dev mailing list discussion:
> [\[DISCUSS\] scalability limits in the 
> coordinator|http://mail-archives.apache.org/mod_mbox/kafka-dev/201605.mbox/%3ccamquqbzddtadhcgl6h4smtgo83uqt4s72gc03b3vfghnme3...@mail.gmail.com%3E]
> There's a scalability limit on the new consumer / coordinator regarding the 
> amount of group metadata we can fit into one message. This restricts a 
> combination of consumer group size, topic subscription sizes, topic 
> assignment sizes, and any remaining member metadata.
> Under more strenuous use cases like mirroring clusters with thousands of 
> topics, this limitation can be reached even after applying gzip to the 
> __consumer_offsets topic.
> Various options were proposed in the discussion:
> # Config change: reduce the number of consumers in the group. This isn't 
> always a realistic answer in more strenuous use cases like MirrorMaker 
> clusters or for auditing.
> # Config change: split the group into smaller groups which together will get 
> full coverage of the topics. This gives each group member a smaller 
> subscription.(ex: g1 has topics starting with a-m while g2 has topics 
> starting with n-z). This would be operationally painful to manage.
> # Config change: split the topics among members of the group. Again this 
> gives each group member a smaller subscription. This would also be 
> operationally painful to manage.
> # Config change: bump up KafkaConfig.messageMaxBytes (a topic-level config) 
> and KafkaConfig.replicaFetchMaxBytes (a broker-level config). Applying 
> messageMaxBytes to just the __consumer_offsets topic seems relatively 
> harmless, but bumping up the broker-level replicaFetchMaxBytes would probably 
> need more attention.
> # Config change: try different compression codecs. Based on 2 minutes of 
> googling, it seems like lz4 and snappy are faster than gzip but have worse 
> compression, so this probably won't help.
> # Implementation change: support sending the regex over the wire instead of 
> the fully expanded topic subscriptions. I think people said in the past that 
> different languages have subtle differences in regex, so this doesn't play 
> nicely with cross-language groups.
> # Implementation change: maybe we can reverse the mapping? Instead of mapping 
> from member to subscriptions, we can map a subscription to a list of members.
> # Implementation change: maybe we can try to break apart the subscription and 
> assignments from the same SyncGroupRequest into multiple records? They can 
> still go to the same message set and get appended together. This way the 
> limit become the segment size, which shouldn't be a problem. This can be 
> tricky to get right because we're currently keying these messages on the 
> group, so I think records from the same rebalance might accidentally compact 
> one another, but my understanding of compaction isn't that great.
> # Implementation change: try to apply some tricks on the assignment 
> serialization to make it smaller.
> # Config and Implementation change: bump up the __consumer_offsets topic 
> messageMaxBytes and (from [~junrao]) fix how we deal with the case when a 
> message is larger than the fetch size. Today, if the fetch size is smaller 
> than the fetch size, the consumer will get stuck. Instead, we can simply 
> return the full message if it's larger than the fetch size w/o requiring the 
> consumer to manually adjust the fetch size.
> # Config and Implementation change: same as above but only apply the special 
> fetch logic when fetching from internal topics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3810) replication of internal topics should not be limited by replica.fetch.max.bytes

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15338679#comment-15338679
 ] 

ASF GitHub Bot commented on KAFKA-3810:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1484


> replication of internal topics should not be limited by 
> replica.fetch.max.bytes
> ---
>
> Key: KAFKA-3810
> URL: https://issues.apache.org/jira/browse/KAFKA-3810
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.10.1.0
>
>
> From the kafka-dev mailing list discussion:
> [\[DISCUSS\] scalability limits in the 
> coordinator|http://mail-archives.apache.org/mod_mbox/kafka-dev/201605.mbox/%3ccamquqbzddtadhcgl6h4smtgo83uqt4s72gc03b3vfghnme3...@mail.gmail.com%3E]
> There's a scalability limit on the new consumer / coordinator regarding the 
> amount of group metadata we can fit into one message. This restricts a 
> combination of consumer group size, topic subscription sizes, topic 
> assignment sizes, and any remaining member metadata.
> Under more strenuous use cases like mirroring clusters with thousands of 
> topics, this limitation can be reached even after applying gzip to the 
> __consumer_offsets topic.
> Various options were proposed in the discussion:
> # Config change: reduce the number of consumers in the group. This isn't 
> always a realistic answer in more strenuous use cases like MirrorMaker 
> clusters or for auditing.
> # Config change: split the group into smaller groups which together will get 
> full coverage of the topics. This gives each group member a smaller 
> subscription.(ex: g1 has topics starting with a-m while g2 has topics 
> starting with n-z). This would be operationally painful to manage.
> # Config change: split the topics among members of the group. Again this 
> gives each group member a smaller subscription. This would also be 
> operationally painful to manage.
> # Config change: bump up KafkaConfig.messageMaxBytes (a topic-level config) 
> and KafkaConfig.replicaFetchMaxBytes (a broker-level config). Applying 
> messageMaxBytes to just the __consumer_offsets topic seems relatively 
> harmless, but bumping up the broker-level replicaFetchMaxBytes would probably 
> need more attention.
> # Config change: try different compression codecs. Based on 2 minutes of 
> googling, it seems like lz4 and snappy are faster than gzip but have worse 
> compression, so this probably won't help.
> # Implementation change: support sending the regex over the wire instead of 
> the fully expanded topic subscriptions. I think people said in the past that 
> different languages have subtle differences in regex, so this doesn't play 
> nicely with cross-language groups.
> # Implementation change: maybe we can reverse the mapping? Instead of mapping 
> from member to subscriptions, we can map a subscription to a list of members.
> # Implementation change: maybe we can try to break apart the subscription and 
> assignments from the same SyncGroupRequest into multiple records? They can 
> still go to the same message set and get appended together. This way the 
> limit become the segment size, which shouldn't be a problem. This can be 
> tricky to get right because we're currently keying these messages on the 
> group, so I think records from the same rebalance might accidentally compact 
> one another, but my understanding of compaction isn't that great.
> # Implementation change: try to apply some tricks on the assignment 
> serialization to make it smaller.
> # Config and Implementation change: bump up the __consumer_offsets topic 
> messageMaxBytes and (from [~junrao]) fix how we deal with the case when a 
> message is larger than the fetch size. Today, if the fetch size is smaller 
> than the fetch size, the consumer will get stuck. Instead, we can simply 
> return the full message if it's larger than the fetch size w/o requiring the 
> consumer to manually adjust the fetch size.
> # Config and Implementation change: same as above but only apply the special 
> fetch logic when fetching from internal topics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-53 Add custom policies for reconnect attempts to NetworkdClient

2016-06-19 Thread Dana Powers
I took a stab at implementing a simplified exponential + randomized
backoff policy here: https://github.com/apache/kafka/pull/1523

Rather than changing public interfaces / using plugins, it just adds a
new client configuration "reconnect.backoff.max" that can be used to
enable increasing backoff when node failures repeat. If this
configuration is not set higher than reconnect.backoff.ms then the
current constant backoff policy is retained. The default is to
continue w/ current 50ms constant backoff.

Thoughts? Would a change like this require a KIP?

-Dana


On Mon, May 2, 2016 at 10:29 AM, Guozhang Wang  wrote:
> For the specific problem of connection storm, randomized with normal
> distribution at specified mean as "reconnect.backoff.ms" has been proved
> pretty well. The most recent usage of it in my mind is RAFT, and it turns
> out pretty effective in eliminating leader-election storms.
>
>
> Guozhang
>
> On Fri, Apr 29, 2016 at 8:57 PM, Ewen Cheslack-Postava 
> wrote:
>
>> I'll agree w/ Jay and point out that the implementations of
>> ReconnectionPolicy provided built-in with that driver are Constant,
>> Exponential, and Counting. Constant and exponential can be combined with
>> the right set of policy config parameters. I'm curious if there's a real
>> need for something else, or if you're just looking for something
>> exponential instead of non-constant? I think a fixed exponential backoff
>> policy that defaults parameters to the current constant backoff policy
>> would probably satisfy our needs.
>>
>> On Mon, Apr 4, 2016 at 1:25 PM, Jay Kreps  wrote:
>>
>> > If I understand the problem we are fixing is a connection storm where
>> when
>> > a new broker comes online it is overwhelmed with connections.
>> >
>> > In general we try hard to avoid plugins where possible. Maybe instead of
>> > adding another plugin interface we could just directly solve this problem
>> > by doing some randomization in the backoff to space out the
>> reconnections?
>> > This seems like it would be good for anyone with a large client
>> > environment?
>> >
>> > -Jay
>> >
>> > On Mon, Apr 4, 2016 at 12:54 PM, Florian Hussonnois <
>> fhussonn...@gmail.com
>> > >
>> > wrote:
>> >
>> > > Hi Kafka Team,
>> > >
>> > > I have made a new Kafka Improvement Proposal.
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-53+-+Add+custom+policies+for+reconnect+attempts+to+NetworkdClient
>> > >
>> > > This is my first proposal so I don't know if I have given enough
>> > > information.
>> > > Also I have already proposed an implementation :
>> > > https://github.com/apache/kafka/pull/1179
>> > >
>> > > Thanks
>> > >
>> > > --
>> > > Florian HUSSONNOIS
>> > >
>> >
>>
>>
>>
>> --
>> Thanks,
>> Ewen
>>
>
>
>
> --
> -- Guozhang


[GitHub] kafka pull request #1484: KAFKA-3810: replication of internal topics should ...

2016-06-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1484


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1523: Support exponential backoff policy via reconnect.b...

2016-06-19 Thread dpkp
GitHub user dpkp opened a pull request:

https://github.com/apache/kafka/pull/1523

Support exponential backoff policy via reconnect.backoff.max

This PR is an alternate / simplified approach to alternate backoff policies 
than KIP-53 and KAFKA-3496.

Summary:

* add `reconnect.backoff.max` common client configuration parameter
* if `reconnect.backoff.max` > `reconnect.backoff.ms`, apply an exponential 
backoff policy
* select actual backoff per node via uniform random distribution to smooth 
cluster reconnects

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dpkp/kafka exp_backoff

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1523.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1523


commit 04bf44a44dbd74c60577bb653089b7c455491926
Author: Dana Powers 
Date:   2016-06-19T17:03:37Z

Support exponential backoff policy via reconnect.backoff.max




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-3864) Kafka Connect Struct.get returning defaultValue from Struct not the field schema

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-3864:
-
Fix Version/s: 0.10.0.1
   0.10.1.0

> Kafka Connect Struct.get returning defaultValue from Struct not the field 
> schema
> 
>
> Key: KAFKA-3864
> URL: https://issues.apache.org/jira/browse/KAFKA-3864
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Andrew Stevenson
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.10.1.0, 0.10.0.1
>
>
>  `public Object get(Field field) {
> Object val = values[field.index()];
> if (val == null && schema.defaultValue() != null) {
> val = schema.defaultValue();
> }
> return val;
> }`
> Should be field.schema.defautlValue



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3864) Kafka Connect Struct.get returning defaultValue from Struct not the field schema

2016-06-19 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-3864:
-
Priority: Blocker  (was: Major)

> Kafka Connect Struct.get returning defaultValue from Struct not the field 
> schema
> 
>
> Key: KAFKA-3864
> URL: https://issues.apache.org/jira/browse/KAFKA-3864
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Andrew Stevenson
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.10.1.0, 0.10.0.1
>
>
>  `public Object get(Field field) {
> Object val = values[field.index()];
> if (val == null && schema.defaultValue() != null) {
> val = schema.defaultValue();
> }
> return val;
> }`
> Should be field.schema.defautlValue



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Consumer autocommit interference via unintentional/internal poll() calls

2016-06-19 Thread Dana Powers
I searched through jira and the mailing list for prior discussion of
this and could not find any. Forgive me if I missed it, and if so
please send a link!

It was raised in the kafka-python issue list by an astute reader that
the KafkaConsumer autocommit semantics can be accidentally broken by
consumer methods that themselves call poll(), triggering background
tasks like AutoCommitTask inadvertently.

Normally, the autocommit semantics say that message offsets will not
be committed (ack) until after the consumer has processed them. Common
pattern in pseudocode would be:

```
while True:
batch = consumer.poll();
for message in batch:
process(message);
# failure here should block acks for all messages since last poll()
```

This is a good at-least-once-delivery model.

But so the problem raised is that if during message processing the
user were to call a consumer method that does network requests via
poll(), then it is possible that the AutoCommitTask could be called
prematurely and messages returned in the last batch could be
committed/acked before processing completes. Such methods appear to
include: consumer.listTopics, consumer.position,
consumer.partitionsFor. The problem then is that if there is a failure
after one of these methods but before message processing completes,
those messages will have been auto-committed and will not be
reprocessed.

Has this issue been discussed before? Any thoughts on how to address?

-Dana


Re: Reduce function Null checks

2016-06-19 Thread Matthias J. Sax
Hi Jeyhun,

thanks for reporting! It got already fixed in trunk version.

https://issues.apache.org/jira/browse/KAFKA-3589
https://github.com/apache/kafka/pull/1246/files


-Matthias

On 06/19/2016 02:46 PM, Jeyhun Karimov wrote:
> Hi community,
> 
> When using, reduce(Reducer,Reducer, KeyValueMapper,String) function in
> KTable, the NullPointerExeption is thrown. In specified function, below
> call is made:
> 
> return reduce(adder, subtractor, selector, null, null, name);
> 
> and afterwards, in reduce(Reducer,Reducer,
> KeyValueMapper,Serde,Serde,String) function, the exception is thrown while
> creating ChangedSerializer. I think null checks should be added to escape
> the exception.
> 
> Jeyhun
> 



signature.asc
Description: OpenPGP digital signature


Re: Operator order

2016-06-19 Thread Matthias J. Sax
Thanks for clarification. Still don't have an better answer as before.

How much overhead my suggestion gives is hard to predict. However, the
filter operators will run in the same thread (it's more or less just
another chained method call), thus, it should not be too large.
Furthermore, it should never the required to write tagged record to
Kafka -- thus, it would only be some main memory overhead. But you would
need to test and measure.

-Matthias

On 06/18/2016 08:13 PM, Jeyhun Karimov wrote:
> Hi Matthias,
> 
> Thank you for your answer. In my use-case, depending on statistics of every
> operator, some tuples can be escaped for specific operators, so that we can
> get approximate but faster result. I think this is somehow similar to
>  TelegraphCQ in dynamism of operators.
> In my case, the goal is getting rid of transmission and processing overhead
> of some tuples for some operators (in runtime) to get approximate results.
> However, it iseems the possible solution can bring extra overhead to system
> in some cases.
> 
> Jeyhun
> 
> On Sat, Jun 18, 2016 at 7:36 PM Matthias J. Sax 
> wrote:
> 
>> Hi Jeyhun,
>>
>> there is no support by the library itself. But you could build a custom
>> solution by building the DAG with all required edges (ie, additional
>> edges from A->E, and B->sink etc.). For this, each output message from A
>> would be duplicate and send to B and E. Therefore, A should "tag" each
>> message with the designated receiver (B or E) and you add additional
>> filter step in both edges (ie, a filter between A->F1->B and A->F2->E),
>> that drop messages if the "tag" does not match the downstream operator.
>>
>> Does this makes sense? Of course, depending on your use case, you might
>> get a huge number of edges (plus filters) and your DAG might be quite
>> complex. Don't see any other solution though.
>>
>> Hope this helps.
>>
>> One question though: how would changing the DAG at runtime would help
>> you? Do you mean you would dynamically change the edge between A->B and
>> A->sink ? I guess, this would be a very special pattern and I doubt that
>> any library or system can offer this.
>>
>> -Matthias
>>
>> On 06/18/2016 05:33 PM, Jeyhun Karimov wrote:
>>> Hi community,
>>>
>>> Is there a way in Kafka Streams to change the order of operators in
>>> runtime? For example, I have operators
>>>
>>> Source->A->B->C->D->E->Sink
>>>
>>> and I want to forward some tuples from A to E, from B to Sink and etc. As
>>> far as I know, the stream execution graph is computed in compile time and
>>> does not change in runtime. Can there be an indirect solution for this
>>> specific case?
>>>
>>> Jeyhun
>>>
>>
>> --
> -Cheers
> 
> Jeyhun
> 



signature.asc
Description: OpenPGP digital signature


Reduce function Null checks

2016-06-19 Thread Jeyhun Karimov
Hi community,

When using, reduce(Reducer,Reducer, KeyValueMapper,String) function in
KTable, the NullPointerExeption is thrown. In specified function, below
call is made:

return reduce(adder, subtractor, selector, null, null, name);

and afterwards, in reduce(Reducer,Reducer,
KeyValueMapper,Serde,Serde,String) function, the exception is thrown while
creating ChangedSerializer. I think null checks should be added to escape
the exception.

Jeyhun
-- 
-Cheers

Jeyhun


Jenkins build is back to normal : kafka-trunk-jdk8 #707

2016-06-19 Thread Apache Jenkins Server
See 



[GitHub] kafka pull request #1500: MINOR: Fix javadoc typos in ConsumerRebalanceListe...

2016-06-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1500


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1500: MINOR: Fix javadoc typos in ConsumerRebalanceListe...

2016-06-19 Thread vahidhashemian
Github user vahidhashemian closed the pull request at:

https://github.com/apache/kafka/pull/1500


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1500: MINOR: Fix javadoc typos in ConsumerRebalanceListe...

2016-06-19 Thread vahidhashemian
GitHub user vahidhashemian reopened a pull request:

https://github.com/apache/kafka/pull/1500

MINOR: Fix javadoc typos in ConsumerRebalanceListener



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka 
typo07/fix_javadoc_typos_consumerrebalancelistener

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1500


commit 7e4bfffe54ae8b9ffdd3549e2973067c86ff6712
Author: Vahid Hashemian 
Date:   2016-06-13T20:02:46Z

MINOR: Fix Javadoc Typos in ConsumerRebalanceListener




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---