[jira] [Created] (KAFKA-3958) To set the maximum no of consumers which can consume from the particular topic.

2016-07-12 Thread Sharad Gupta (JIRA)
Sharad Gupta created KAFKA-3958:
---

 Summary: To set the maximum no of consumers which can consume from 
the particular topic.
 Key: KAFKA-3958
 URL: https://issues.apache.org/jira/browse/KAFKA-3958
 Project: Kafka
  Issue Type: Wish
Affects Versions: 0.11.0.0
Reporter: Sharad Gupta
Priority: Minor


As in the production environment  there might be the case that the consumers 
are running on several Application Server but the user wants that the message 
produced to certain topic should not be consumed by every AP (All AP consumers 
are in different consumer group) instead on the basis of first come first serve 
till the count is met and after that every request from other AP will result in 
no message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

2016-07-12 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15374085#comment-15374085
 ] 

Bill Bejeck commented on KAFKA-3101:


[~enothereska] [~guozhang]

With regards to the performance comparison here is my plan:  
 Create a simple streams process with  KTableAggregate utilizing 
Objects(records) first then bytes.
 Track the memory usage via the jamm library referenced above.
 Track message throughput for both types (records vs bytes).
 Profile how much CPU time is spent in the serialization/deserialization 
process.

Is this reasonable?  Any additional thoughts or comments?

> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3945) kafka-console-producer.sh does not accept request-required-acks=all

2016-07-12 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-3945:
---
Status: Patch Available  (was: Open)

> kafka-console-producer.sh does not accept request-required-acks=all
> ---
>
> Key: KAFKA-3945
> URL: https://issues.apache.org/jira/browse/KAFKA-3945
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> The Java producer accepts 0, 1, -1, and all as valid values for acks. The 
> kafka-console-producer.sh however does not accept "all" as a valid value.
> {code}
> $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> --request-required-acks all
> Cannot parse argument 'all' of option request-required-acks
> {code}
> The code seems to expect it to be an integer:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L178-L182
> If I pass in an Integer, though, the error message for it says that "all" is 
> a valid value.
> {code}
> $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> --request-required-acks 2
> org.apache.kafka.common.config.ConfigException: Invalid value 2 for 
> configuration acks: String must be one of: all, -1, 0, 1
>   at 
> org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:827)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:427)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:55)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:338)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:188)
>   at kafka.producer.NewShinyProducer.(BaseProducer.scala:40)
>   at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)
>   at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
> {code}
> Either the kafka-console-producer.sh needs to be updated to accept "all", or 
> the documentation for kafka-console-producer should be updated to say that 
> "all" is not a valid value. Either way, the two should be in sync with each 
> other.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3945) kafka-console-producer.sh does not accept request-required-acks=all

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373912#comment-15373912
 ] 

ASF GitHub Bot commented on KAFKA-3945:
---

GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1618

KAFKA-3945: Fix validation of 'acks' config in console producer

The `acks` config that is provided to console producer with 
`request-required-acks` comes with `all`, `-1`, `0`, `1` as valid options 
(`all` and `-1` being interchangeable).
This PR input validation so that `all` is also accepted as an input value.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3945

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1618


commit fba6a2b1e272a43196b7d7125e202b9da3f29949
Author: Vahid Hashemian 
Date:   2016-07-12T23:03:04Z

KAFKA-3945: Fix validation of 'acks' config in console producer

The 'acks' config that is provided to console producer with 
'request-required-acks' comes with 'all', -1, 0, 1 as valid options (with 'all' 
and -1 being interchangeable).
This PR input validation so that 'all' is also accepted as an input value.




> kafka-console-producer.sh does not accept request-required-acks=all
> ---
>
> Key: KAFKA-3945
> URL: https://issues.apache.org/jira/browse/KAFKA-3945
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> The Java producer accepts 0, 1, -1, and all as valid values for acks. The 
> kafka-console-producer.sh however does not accept "all" as a valid value.
> {code}
> $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> --request-required-acks all
> Cannot parse argument 'all' of option request-required-acks
> {code}
> The code seems to expect it to be an integer:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L178-L182
> If I pass in an Integer, though, the error message for it says that "all" is 
> a valid value.
> {code}
> $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> --request-required-acks 2
> org.apache.kafka.common.config.ConfigException: Invalid value 2 for 
> configuration acks: String must be one of: all, -1, 0, 1
>   at 
> org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:827)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:427)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:55)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:338)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:188)
>   at kafka.producer.NewShinyProducer.(BaseProducer.scala:40)
>   at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)
>   at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
> {code}
> Either the kafka-console-producer.sh needs to be updated to accept "all", or 
> the documentation for kafka-console-producer should be updated to say that 
> "all" is not a valid value. Either way, the two should be in sync with each 
> other.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1618: KAFKA-3945: Fix validation of 'acks' config in con...

2016-07-12 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1618

KAFKA-3945: Fix validation of 'acks' config in console producer

The `acks` config that is provided to console producer with 
`request-required-acks` comes with `all`, `-1`, `0`, `1` as valid options 
(`all` and `-1` being interchangeable).
This PR input validation so that `all` is also accepted as an input value.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3945

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1618


commit fba6a2b1e272a43196b7d7125e202b9da3f29949
Author: Vahid Hashemian 
Date:   2016-07-12T23:03:04Z

KAFKA-3945: Fix validation of 'acks' config in console producer

The 'acks' config that is provided to console producer with 
'request-required-acks' comes with 'all', -1, 0, 1 as valid options (with 'all' 
and -1 being interchangeable).
This PR input validation so that 'all' is also accepted as an input value.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change

2016-07-12 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373872#comment-15373872
 ] 

Maysam Yabandeh commented on KAFKA-1120:


[~junrao] This is currently a major pain point for us. If you are comfortable 
with the solution above and are just being blocked due to lack of manpower I 
could pick it up from here and work on a patch.

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3945) kafka-console-producer.sh does not accept request-required-acks=all

2016-07-12 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-3945:
--

Assignee: Vahid Hashemian

> kafka-console-producer.sh does not accept request-required-acks=all
> ---
>
> Key: KAFKA-3945
> URL: https://issues.apache.org/jira/browse/KAFKA-3945
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> The Java producer accepts 0, 1, -1, and all as valid values for acks. The 
> kafka-console-producer.sh however does not accept "all" as a valid value.
> {code}
> $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> --request-required-acks all
> Cannot parse argument 'all' of option request-required-acks
> {code}
> The code seems to expect it to be an integer:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L178-L182
> If I pass in an Integer, though, the error message for it says that "all" is 
> a valid value.
> {code}
> $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> --request-required-acks 2
> org.apache.kafka.common.config.ConfigException: Invalid value 2 for 
> configuration acks: String must be one of: all, -1, 0, 1
>   at 
> org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:827)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:427)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:55)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:338)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:188)
>   at kafka.producer.NewShinyProducer.(BaseProducer.scala:40)
>   at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)
>   at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
> {code}
> Either the kafka-console-producer.sh needs to be updated to accept "all", or 
> the documentation for kafka-console-producer should be updated to say that 
> "all" is not a valid value. Either way, the two should be in sync with each 
> other.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3957) consumer timeout not being respected when kafka broker is not available

2016-07-12 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373823#comment-15373823
 ] 

Vahid Hashemian edited comment on KAFKA-3957 at 7/12/16 10:30 PM:
--

This is already filed under an earlier JIRA 
([KAFKA-1894|https://issues.apache.org/jira/browse/KAFKA-1894]). I marked it as 
a duplicate, but please advise if you disagree.


was (Author: vahid):
This is already filed under an earlier JIRA 
([KAFKA-1894|https://issues.apache.org/jira/browse/KAFKA-1894]).

> consumer timeout not being respected when kafka broker is not available
> ---
>
> Key: KAFKA-3957
> URL: https://issues.apache.org/jira/browse/KAFKA-3957
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Vincent Fumo
>Priority: Minor
>
> KafkaConsumer v0.9::
> I have a consumer set up with session.timeout.ms set to 30s. I make a call 
> like
> consumer.poll(1)
> but if the kafka broker is down, that call will hang indefinitely.
> Digging into the code it seems that the timeout isn't respected:
> KafkaConsumer calls out to pollOnce() as seen below::
>private Map>> pollOnce(long 
> timeout) {
>// TODO: Sub-requests should take into account the poll timeout 
> (KAFKA-1894)
>coordinator.ensureCoordinatorKnown();
>// ensure we have partitions assigned if we expect to
>if (subscriptions.partitionsAutoAssigned())
>coordinator.ensurePartitionAssignment();
>// fetch positions if we have partitions we're subscribed to that we
>// don't know the offset for
>if (!subscriptions.hasAllFetchPositions())
>updateFetchPositions(this.subscriptions.missingFetchPositions());
>// init any new fetches (won't resend pending fetches)
>Cluster cluster = this.metadata.fetch();
>Map>> records = 
> fetcher.fetchedRecords();
>// if data is available already, e.g. from a previous network client 
> poll() call to commit,
>// then just return it immediately
>if (!records.isEmpty()) {
>return records;
>}
>fetcher.initFetches(cluster);
>client.poll(timeout);
>return fetcher.fetchedRecords();
>}
> and we see that we stick on the call to coordinator.ensureCoordinatorKnown();
> AbstractCoordinator ::
>public void ensureCoordinatorKnown() {
>while (coordinatorUnknown()) {
>RequestFuture future = sendGroupMetadataRequest();
>client.poll(future);
>if (future.failed()) {
>if (future.isRetriable())
>client.awaitMetadataUpdate();
>else
>throw future.exception();
>}
>}
>}
> in this case the Future fails (since the broker is down) and then a call to 
> client.awaitMetadataUpdate() is made which in the case of the 
> ConsumerNetworkClient will block forever :
>public void awaitMetadataUpdate() {
>int version = this.metadata.requestUpdate();
>do {
>poll(Long.MAX_VALUE);
>} while (this.metadata.version() == version);
>}
> I feel that this is a bug. When you set a timeout on a call to a blocking 
> method, that timeout should be respected and an exception should be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3957) consumer timeout not being respected when kafka broker is not available

2016-07-12 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373823#comment-15373823
 ] 

Vahid Hashemian commented on KAFKA-3957:


This is already filed under an earlier JIRA 
([KAFKA-1894|https://issues.apache.org/jira/browse/KAFKA-1894]).

> consumer timeout not being respected when kafka broker is not available
> ---
>
> Key: KAFKA-3957
> URL: https://issues.apache.org/jira/browse/KAFKA-3957
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Vincent Fumo
>Priority: Minor
>
> KafkaConsumer v0.9::
> I have a consumer set up with session.timeout.ms set to 30s. I make a call 
> like
> consumer.poll(1)
> but if the kafka broker is down, that call will hang indefinitely.
> Digging into the code it seems that the timeout isn't respected:
> KafkaConsumer calls out to pollOnce() as seen below::
>private Map>> pollOnce(long 
> timeout) {
>// TODO: Sub-requests should take into account the poll timeout 
> (KAFKA-1894)
>coordinator.ensureCoordinatorKnown();
>// ensure we have partitions assigned if we expect to
>if (subscriptions.partitionsAutoAssigned())
>coordinator.ensurePartitionAssignment();
>// fetch positions if we have partitions we're subscribed to that we
>// don't know the offset for
>if (!subscriptions.hasAllFetchPositions())
>updateFetchPositions(this.subscriptions.missingFetchPositions());
>// init any new fetches (won't resend pending fetches)
>Cluster cluster = this.metadata.fetch();
>Map>> records = 
> fetcher.fetchedRecords();
>// if data is available already, e.g. from a previous network client 
> poll() call to commit,
>// then just return it immediately
>if (!records.isEmpty()) {
>return records;
>}
>fetcher.initFetches(cluster);
>client.poll(timeout);
>return fetcher.fetchedRecords();
>}
> and we see that we stick on the call to coordinator.ensureCoordinatorKnown();
> AbstractCoordinator ::
>public void ensureCoordinatorKnown() {
>while (coordinatorUnknown()) {
>RequestFuture future = sendGroupMetadataRequest();
>client.poll(future);
>if (future.failed()) {
>if (future.isRetriable())
>client.awaitMetadataUpdate();
>else
>throw future.exception();
>}
>}
>}
> in this case the Future fails (since the broker is down) and then a call to 
> client.awaitMetadataUpdate() is made which in the case of the 
> ConsumerNetworkClient will block forever :
>public void awaitMetadataUpdate() {
>int version = this.metadata.requestUpdate();
>do {
>poll(Long.MAX_VALUE);
>} while (this.metadata.version() == version);
>}
> I feel that this is a bug. When you set a timeout on a call to a blocking 
> method, that timeout should be respected and an exception should be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : kafka-trunk-jdk8 #750

2016-07-12 Thread Apache Jenkins Server
See 



Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-12 Thread Damian Guy
Ismael - that is fine with me.
Thanks

On Tue, 12 Jul 2016 at 14:11 Ismael Juma  wrote:

> Hi Damian,
>
> How about StreamsMetadata instead? The general naming pattern seems to
> avoid the `Kafka` prefix for everything outside of `KafkaStreams` itself.
>
> Ismael
>
> On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy  wrote:
>
> > Hi,
> >
> > I agree with point 1. application.server is a better name for the config
> > (we'll change this). However, on point 2 I think we should stick mostly
> > with what we already have. I've tried both ways of doing this when
> working
> > on the JIRA and building examples and I find the current approach more
> > intuitive and easier to use than the Map based approach.
> > However, there is probably a naming issue. We should rename
> > KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very simple,
> > but provides all the information a developer needs to be able to find the
> > instance(s) of a Streams application that a particular store is running
> on,
> > i.e.,
> >
> > public class KafkStreamsMetadata {
> > private final HostInfo hostInfo;
> > private final Set stateStoreNames;
> > private final Set topicPartitions;
> >
> >
> > So using the API to route to a new host is fairly simple, particularly in
> > the case when you want to find the host for a particular key, i.e.,
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final KafkaStreamsMetadata streamsMetadata =
> > kafkaStreams.instanceWithKey("word-count", "hello",
> > Serdes.String().serializer());
> > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> >
> >
> > And if you want to do a scatter gather approach:
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final Collection kafkaStreamsMetadatas =
> > kafkaStreams.allInstancesWithStore("word-count");
> > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> > ...
> > }
> >
> >
> > And if you iterated over all instances:
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final Collection kafkaStreamsMetadatas =
> > kafkaStreams.allInstances();
> > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > if (streamsMetadata.stateStoreNames().contains("word-count")) {
> > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> > ...
> > }
> > }
> >
> >
> > If we were to change this to use Map for the
> > most part users would need to iterate over the entry or key set.
> Examples:
> >
> > The finding an instance by key is a little odd:
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final Map streamsMetadata =
> > kafkaStreams.instanceWithKey("word-count","hello",
> > Serdes.String().serializer());
> > // this is a bit odd as i only expect one:
> > for (HostInfo hostInfo : streamsMetadata.keySet()) {
> > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> > }
> >
> >
> > The scatter/gather by store is fairly similar to the previous example:
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final Map streamsMetadata =
> > kafkaStreams.allInstancesWithStore("word-count");
> > for(HostInfo hostInfo : streamsMetadata.keySet()) {
> > http.get("http://; + hostInfo.host() + ":" + hostInfo.port() +
> > "/get/word-count/hello");
> > ...
> > }
> >
> > And iterating over all instances:
> >
> > final Map streamsMetadata =
> > kafkaStreams.allInstances();
> > for (Map.Entry entry :
> > streamsMetadata.entrySet()) {
> > for (TaskMetadata taskMetadata : entry.getValue()) {
> > if (taskMetadata.stateStoreNames().contains("word-count")) {
> > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> > ...
> > }
> > }
> > }
> >
> >
> > IMO - having a class we return is the better approach as it nicely wraps
> > the related things, i.e, host:port, store names, topic partitions into an
> > Object that is easy to use. Further we could add some behaviour to this
> > class if we felt it necessary, i.e, hasStore(storeName) etc.
> >
> > Anyway, i'm interested in your thoughts.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 11 Jul 2016 at 13:47 Guozhang Wang  wrote:
> >
> > > 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
> > >
> > > I agree with Neha that Kafka Streams can provide the bare minimum APIs
> > just
> > > for host/port, and user's implemented layer can provide URL / proxy
> > address
> > > they want to build on top of it.
> > >
> > >
> > > 2. Re Improving KafkaStreamsInstance interface:
> > >
> > > Users 

Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-12 Thread Ismael Juma
Hi Damian,

How about StreamsMetadata instead? The general naming pattern seems to
avoid the `Kafka` prefix for everything outside of `KafkaStreams` itself.

Ismael

On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy  wrote:

> Hi,
>
> I agree with point 1. application.server is a better name for the config
> (we'll change this). However, on point 2 I think we should stick mostly
> with what we already have. I've tried both ways of doing this when working
> on the JIRA and building examples and I find the current approach more
> intuitive and easier to use than the Map based approach.
> However, there is probably a naming issue. We should rename
> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very simple,
> but provides all the information a developer needs to be able to find the
> instance(s) of a Streams application that a particular store is running on,
> i.e.,
>
> public class KafkStreamsMetadata {
> private final HostInfo hostInfo;
> private final Set stateStoreNames;
> private final Set topicPartitions;
>
>
> So using the API to route to a new host is fairly simple, particularly in
> the case when you want to find the host for a particular key, i.e.,
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final KafkaStreamsMetadata streamsMetadata =
> kafkaStreams.instanceWithKey("word-count", "hello",
> Serdes.String().serializer());
> http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
>
>
> And if you want to do a scatter gather approach:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Collection kafkaStreamsMetadatas =
> kafkaStreams.allInstancesWithStore("word-count");
> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
> ...
> }
>
>
> And if you iterated over all instances:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Collection kafkaStreamsMetadatas =
> kafkaStreams.allInstances();
> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> if (streamsMetadata.stateStoreNames().contains("word-count")) {
> http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
> ...
> }
> }
>
>
> If we were to change this to use Map for the
> most part users would need to iterate over the entry or key set. Examples:
>
> The finding an instance by key is a little odd:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Map streamsMetadata =
> kafkaStreams.instanceWithKey("word-count","hello",
> Serdes.String().serializer());
> // this is a bit odd as i only expect one:
> for (HostInfo hostInfo : streamsMetadata.keySet()) {
> http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
> }
>
>
> The scatter/gather by store is fairly similar to the previous example:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Map streamsMetadata =
> kafkaStreams.allInstancesWithStore("word-count");
> for(HostInfo hostInfo : streamsMetadata.keySet()) {
> http.get("http://; + hostInfo.host() + ":" + hostInfo.port() +
> "/get/word-count/hello");
> ...
> }
>
> And iterating over all instances:
>
> final Map streamsMetadata =
> kafkaStreams.allInstances();
> for (Map.Entry entry :
> streamsMetadata.entrySet()) {
> for (TaskMetadata taskMetadata : entry.getValue()) {
> if (taskMetadata.stateStoreNames().contains("word-count")) {
> http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
> ...
> }
> }
> }
>
>
> IMO - having a class we return is the better approach as it nicely wraps
> the related things, i.e, host:port, store names, topic partitions into an
> Object that is easy to use. Further we could add some behaviour to this
> class if we felt it necessary, i.e, hasStore(storeName) etc.
>
> Anyway, i'm interested in your thoughts.
>
> Thanks,
> Damian
>
> On Mon, 11 Jul 2016 at 13:47 Guozhang Wang  wrote:
>
> > 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
> >
> > I agree with Neha that Kafka Streams can provide the bare minimum APIs
> just
> > for host/port, and user's implemented layer can provide URL / proxy
> address
> > they want to build on top of it.
> >
> >
> > 2. Re Improving KafkaStreamsInstance interface:
> >
> > Users are indeed aware of "TaskId" class which is not part of internal
> > packages and is exposed in PartitionGrouper interface that can be
> > instantiated by the users, which is assigned with input topic partitions.
> > So we can probably change the APIs as:
> >
> > Map KafkaStreams.getAllTasks() where
> > TaskMetadata has fields such as taskId, list of 

Build failed in Jenkins: kafka-0.10.0-jdk7 #147

2016-07-12 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3931: Fix transient failures in pattern subscription tests

[wangguoz] MINOR: Check null in SmokeTestDriver to avoid NPE

--
[...truncated 1120 lines...]
kafka.api.ProducerFailureHandlingTest > testSendAfterClosed PASSED

kafka.api.ProducerFailureHandlingTest > testTooLargeRecordWithAckZero PASSED

kafka.api.ProducerFailureHandlingTest > 
testNotEnoughReplicasAfterBrokerShutdown PASSED

kafka.api.SslProducerSendTest > testSendNonCompressedMessageWithCreateTime 
PASSED

kafka.api.SslProducerSendTest > testSendCompressedMessageWithLogAppendTime 
PASSED

kafka.api.SslProducerSendTest > testClose PASSED

kafka.api.SslProducerSendTest > testFlush PASSED

kafka.api.SslProducerSendTest > testSendToPartition PASSED

kafka.api.SslProducerSendTest > testSendOffset PASSED

kafka.api.SslProducerSendTest > testAutoCreateTopic PASSED

kafka.api.SslProducerSendTest > testSendWithInvalidCreateTime PASSED

kafka.api.SslProducerSendTest > testSendCompressedMessageWithCreateTime PASSED

kafka.api.SslProducerSendTest > testCloseWithZeroTimeoutFromCallerThread PASSED

kafka.api.SslProducerSendTest > testCloseWithZeroTimeoutFromSenderThread PASSED

kafka.api.SslProducerSendTest > testSendNonCompressedMessageWithLogApendTime 
PASSED

kafka.api.SaslPlainPlaintextConsumerTest > 
testPauseStateNotPreservedByRebalance PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testUnsubscribeTopic PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testListTopics PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testAutoCommitOnRebalance PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testSimpleConsumption PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testPartitionReassignmentCallback 
PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testCommitSpecifiedOffsets PASSED

kafka.api.SaslPlaintextConsumerTest > testPauseStateNotPreservedByRebalance 
PASSED

kafka.api.SaslPlaintextConsumerTest > testUnsubscribeTopic FAILED
kafka.admin.AdminOperationException: replication factor: 3 larger than 
available brokers: 2
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
at kafka.utils.TestUtils$.createTopic(TestUtils.scala:237)
at 
kafka.api.IntegrationTestHarness$class.setUp(IntegrationTestHarness.scala:80)
at kafka.api.BaseConsumerTest.setUp(BaseConsumerTest.scala:60)
at 
kafka.api.SaslPlaintextConsumerTest.kafka$api$SaslTestHarness$$super$setUp(SaslPlaintextConsumerTest.scala:17)
at kafka.api.SaslTestHarness$class.setUp(SaslTestHarness.scala:33)
at 
kafka.api.SaslPlaintextConsumerTest.setUp(SaslPlaintextConsumerTest.scala:17)

kafka.api.SaslPlaintextConsumerTest > testListTopics FAILED
java.lang.AssertionError: Partition [part-test-topic-2,0] metadata not 
propagated after 5000 ms

kafka.api.SaslPlaintextConsumerTest > testAutoCommitOnRebalance PASSED

kafka.api.SaslPlaintextConsumerTest > testSimpleConsumption FAILED
java.lang.AssertionError: Partition [__consumer_offsets,0] metadata not 
propagated after 5000 ms
at org.junit.Assert.fail(Assert.java:88)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:771)
at 
kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:812)
at 
kafka.utils.TestUtils$$anonfun$createTopic$1.apply(TestUtils.scala:240)
at 
kafka.utils.TestUtils$$anonfun$createTopic$1.apply(TestUtils.scala:239)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.utils.TestUtils$.createTopic(TestUtils.scala:239)
at 
kafka.api.IntegrationTestHarness$class.setUp(IntegrationTestHarness.scala:80)
at kafka.api.BaseConsumerTest.setUp(BaseConsumerTest.scala:60)
at 
kafka.api.SaslPlaintextConsumerTest.kafka$api$SaslTestHarness$$super$setUp(SaslPlaintextConsumerTest.scala:17)
at kafka.api.SaslTestHarness$class.setUp(SaslTestHarness.scala:33)
at 
kafka.api.SaslPlaintextConsumerTest.setUp(SaslPlaintextConsumerTest.scala:17)

kafka.api.SaslPlaintextConsumerTest > testPartitionReassignmentCallback PASSED

kafka.api.SaslPlaintextConsumerTest > testCommitSpecifiedOffsets PASSED

kafka.api.SaslSslConsumerTest > testPauseStateNotPreservedByRebalance FAILED
java.lang.AssertionError: Partition [__consumer_offsets,0] metadata not 
propagated after 5000 ms
at org.junit.Assert.fail(Assert.java:88)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:771)
at 

Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-12 Thread Guozhang Wang
Hi Damian,

After running the example code in mind with these two approaches I'm sold
on your arguments :)

Guozhang


On Tue, Jul 12, 2016 at 12:49 PM, Sriram Subramanian 
wrote:

> I liked abstracting the metadata approach as well. It also helps to evolve
> in the future.
>
> On Tue, Jul 12, 2016 at 12:42 PM, Michael Noll 
> wrote:
>
> > Like Damian I'd also favor a proper type (KafkaStreamsMetadata) rather
> than
> > a Map-based construct.
> >
> > -Michael
> >
> > On Tue, Jul 12, 2016 at 8:45 PM, Damian Guy 
> wrote:
> >
> > > One more thing on the above, the methods on KafkaStreams should be
> > changed
> > > to something like:
> > >
> > > Collection allMetadata()
> > >
> > > Collection allMetadataForStore(final String
> > > storeName)
> > >
> > > KafkaStreamsMetadata metadataWithKey(final String storeName,
> > > final K key,
> > > final Serializer
> > > keySerializer)
> > >
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 12 Jul 2016 at 11:14 Damian Guy  wrote:
> > >
> > > > Hi,
> > > >
> > > > I agree with point 1. application.server is a better name for the
> > config
> > > > (we'll change this). However, on point 2 I think we should stick
> mostly
> > > > with what we already have. I've tried both ways of doing this when
> > > working
> > > > on the JIRA and building examples and I find the current approach
> more
> > > > intuitive and easier to use than the Map based approach.
> > > > However, there is probably a naming issue. We should rename
> > > > KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very
> > simple,
> > > > but provides all the information a developer needs to be able to find
> > the
> > > > instance(s) of a Streams application that a particular store is
> running
> > > on,
> > > > i.e.,
> > > >
> > > > public class KafkStreamsMetadata {
> > > > private final HostInfo hostInfo;
> > > > private final Set stateStoreNames;
> > > > private final Set topicPartitions;
> > > >
> > > >
> > > > So using the API to route to a new host is fairly simple,
> particularly
> > in
> > > > the case when you want to find the host for a particular key, i.e.,
> > > >
> > > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > final KafkaStreamsMetadata streamsMetadata =
> > > kafkaStreams.instanceWithKey("word-count", "hello",
> > > Serdes.String().serializer());
> > > > http.get("http://; + streamsMetadata.host() + ":" +
> > > streamsMetadata.port() + "/get/word-count/hello");
> > > >
> > > >
> > > > And if you want to do a scatter gather approach:
> > > >
> > > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > final Collection kafkaStreamsMetadatas =
> > > kafkaStreams.allInstancesWithStore("word-count");
> > > > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > > > http.get("http://; + streamsMetadata.host() + ":" +
> > > streamsMetadata.port() + "/get/word-count/hello");
> > > > ...
> > > > }
> > > >
> > > >
> > > > And if you iterated over all instances:
> > > >
> > > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > final Collection kafkaStreamsMetadatas =
> > > kafkaStreams.allInstances();
> > > > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > > > if (streamsMetadata.stateStoreNames().contains("word-count")) {
> > > > http.get("http://; + streamsMetadata.host() + ":" +
> > > streamsMetadata.port() + "/get/word-count/hello");
> > > > ...
> > > > }
> > > > }
> > > >
> > > >
> > > > If we were to change this to use Map for
> > the
> > > > most part users would need to iterate over the entry or key set.
> > > Examples:
> > > >
> > > > The finding an instance by key is a little odd:
> > > >
> > > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > final Map streamsMetadata =
> > > kafkaStreams.instanceWithKey("word-count","hello",
> > > Serdes.String().serializer());
> > > > // this is a bit odd as i only expect one:
> > > > for (HostInfo hostInfo : streamsMetadata.keySet()) {
> > > > http.get("http://; + streamsMetadata.host() + ":" +
> > > streamsMetadata.port() + "/get/word-count/hello");
> > > > }
> > > >
> > > >
> > > > The scatter/gather by store is fairly similar to the previous
> example:
> > > >
> > > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > final Map streamsMetadata =
> > > kafkaStreams.allInstancesWithStore("word-count");
> > > > for(HostInfo hostInfo : streamsMetadata.keySet()) {
> > > > http.get("http://; + hostInfo.host() + ":" + hostInfo.port() +
> > > "/get/word-count/hello");
> > > > ...
> > > > }
> > > >
> > > > And iterating over all instances:
> > > >
> > > > final Map streamsMetadata =
> > > kafkaStreams.allInstances();
> > > > for 

[GitHub] kafka pull request #1617: syncing mine with upstream

2016-07-12 Thread prabcs
Github user prabcs closed the pull request at:

https://github.com/apache/kafka/pull/1617


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1617: syncing mine with upstream

2016-07-12 Thread prabcs
GitHub user prabcs opened a pull request:

https://github.com/apache/kafka/pull/1617

syncing mine with upstream



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/prabcs/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1617.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1617






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-12 Thread Sriram Subramanian
I liked abstracting the metadata approach as well. It also helps to evolve
in the future.

On Tue, Jul 12, 2016 at 12:42 PM, Michael Noll  wrote:

> Like Damian I'd also favor a proper type (KafkaStreamsMetadata) rather than
> a Map-based construct.
>
> -Michael
>
> On Tue, Jul 12, 2016 at 8:45 PM, Damian Guy  wrote:
>
> > One more thing on the above, the methods on KafkaStreams should be
> changed
> > to something like:
> >
> > Collection allMetadata()
> >
> > Collection allMetadataForStore(final String
> > storeName)
> >
> > KafkaStreamsMetadata metadataWithKey(final String storeName,
> > final K key,
> > final Serializer
> > keySerializer)
> >
> >
> > Thanks,
> > Damian
> >
> > On Tue, 12 Jul 2016 at 11:14 Damian Guy  wrote:
> >
> > > Hi,
> > >
> > > I agree with point 1. application.server is a better name for the
> config
> > > (we'll change this). However, on point 2 I think we should stick mostly
> > > with what we already have. I've tried both ways of doing this when
> > working
> > > on the JIRA and building examples and I find the current approach more
> > > intuitive and easier to use than the Map based approach.
> > > However, there is probably a naming issue. We should rename
> > > KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very
> simple,
> > > but provides all the information a developer needs to be able to find
> the
> > > instance(s) of a Streams application that a particular store is running
> > on,
> > > i.e.,
> > >
> > > public class KafkStreamsMetadata {
> > > private final HostInfo hostInfo;
> > > private final Set stateStoreNames;
> > > private final Set topicPartitions;
> > >
> > >
> > > So using the API to route to a new host is fairly simple, particularly
> in
> > > the case when you want to find the host for a particular key, i.e.,
> > >
> > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > final KafkaStreamsMetadata streamsMetadata =
> > kafkaStreams.instanceWithKey("word-count", "hello",
> > Serdes.String().serializer());
> > > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> > >
> > >
> > > And if you want to do a scatter gather approach:
> > >
> > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > final Collection kafkaStreamsMetadatas =
> > kafkaStreams.allInstancesWithStore("word-count");
> > > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> > > ...
> > > }
> > >
> > >
> > > And if you iterated over all instances:
> > >
> > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > final Collection kafkaStreamsMetadatas =
> > kafkaStreams.allInstances();
> > > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > > if (streamsMetadata.stateStoreNames().contains("word-count")) {
> > > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> > > ...
> > > }
> > > }
> > >
> > >
> > > If we were to change this to use Map for
> the
> > > most part users would need to iterate over the entry or key set.
> > Examples:
> > >
> > > The finding an instance by key is a little odd:
> > >
> > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > final Map streamsMetadata =
> > kafkaStreams.instanceWithKey("word-count","hello",
> > Serdes.String().serializer());
> > > // this is a bit odd as i only expect one:
> > > for (HostInfo hostInfo : streamsMetadata.keySet()) {
> > > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> > > }
> > >
> > >
> > > The scatter/gather by store is fairly similar to the previous example:
> > >
> > > final KafkaStreams kafkaStreams = createKafkaStreams();
> > > final Map streamsMetadata =
> > kafkaStreams.allInstancesWithStore("word-count");
> > > for(HostInfo hostInfo : streamsMetadata.keySet()) {
> > > http.get("http://; + hostInfo.host() + ":" + hostInfo.port() +
> > "/get/word-count/hello");
> > > ...
> > > }
> > >
> > > And iterating over all instances:
> > >
> > > final Map streamsMetadata =
> > kafkaStreams.allInstances();
> > > for (Map.Entry entry :
> > streamsMetadata.entrySet()) {
> > > for (TaskMetadata taskMetadata : entry.getValue()) {
> > > if (taskMetadata.stateStoreNames().contains("word-count")) {
> > > http.get("http://; + streamsMetadata.host() + ":" +
> > streamsMetadata.port() + "/get/word-count/hello");
> > > ...
> > > }
> > > }
> > > }
> > >
> > >
> > > IMO - having a class we return is the better approach as it nicely
> 

Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-12 Thread Michael Noll
Like Damian I'd also favor a proper type (KafkaStreamsMetadata) rather than
a Map-based construct.

-Michael

On Tue, Jul 12, 2016 at 8:45 PM, Damian Guy  wrote:

> One more thing on the above, the methods on KafkaStreams should be changed
> to something like:
>
> Collection allMetadata()
>
> Collection allMetadataForStore(final String
> storeName)
>
> KafkaStreamsMetadata metadataWithKey(final String storeName,
> final K key,
> final Serializer
> keySerializer)
>
>
> Thanks,
> Damian
>
> On Tue, 12 Jul 2016 at 11:14 Damian Guy  wrote:
>
> > Hi,
> >
> > I agree with point 1. application.server is a better name for the config
> > (we'll change this). However, on point 2 I think we should stick mostly
> > with what we already have. I've tried both ways of doing this when
> working
> > on the JIRA and building examples and I find the current approach more
> > intuitive and easier to use than the Map based approach.
> > However, there is probably a naming issue. We should rename
> > KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very simple,
> > but provides all the information a developer needs to be able to find the
> > instance(s) of a Streams application that a particular store is running
> on,
> > i.e.,
> >
> > public class KafkStreamsMetadata {
> > private final HostInfo hostInfo;
> > private final Set stateStoreNames;
> > private final Set topicPartitions;
> >
> >
> > So using the API to route to a new host is fairly simple, particularly in
> > the case when you want to find the host for a particular key, i.e.,
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final KafkaStreamsMetadata streamsMetadata =
> kafkaStreams.instanceWithKey("word-count", "hello",
> Serdes.String().serializer());
> > http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
> >
> >
> > And if you want to do a scatter gather approach:
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final Collection kafkaStreamsMetadatas =
> kafkaStreams.allInstancesWithStore("word-count");
> > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
> > ...
> > }
> >
> >
> > And if you iterated over all instances:
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final Collection kafkaStreamsMetadatas =
> kafkaStreams.allInstances();
> > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > if (streamsMetadata.stateStoreNames().contains("word-count")) {
> > http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
> > ...
> > }
> > }
> >
> >
> > If we were to change this to use Map for the
> > most part users would need to iterate over the entry or key set.
> Examples:
> >
> > The finding an instance by key is a little odd:
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final Map streamsMetadata =
> kafkaStreams.instanceWithKey("word-count","hello",
> Serdes.String().serializer());
> > // this is a bit odd as i only expect one:
> > for (HostInfo hostInfo : streamsMetadata.keySet()) {
> > http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
> > }
> >
> >
> > The scatter/gather by store is fairly similar to the previous example:
> >
> > final KafkaStreams kafkaStreams = createKafkaStreams();
> > final Map streamsMetadata =
> kafkaStreams.allInstancesWithStore("word-count");
> > for(HostInfo hostInfo : streamsMetadata.keySet()) {
> > http.get("http://; + hostInfo.host() + ":" + hostInfo.port() +
> "/get/word-count/hello");
> > ...
> > }
> >
> > And iterating over all instances:
> >
> > final Map streamsMetadata =
> kafkaStreams.allInstances();
> > for (Map.Entry entry :
> streamsMetadata.entrySet()) {
> > for (TaskMetadata taskMetadata : entry.getValue()) {
> > if (taskMetadata.stateStoreNames().contains("word-count")) {
> > http.get("http://; + streamsMetadata.host() + ":" +
> streamsMetadata.port() + "/get/word-count/hello");
> > ...
> > }
> > }
> > }
> >
> >
> > IMO - having a class we return is the better approach as it nicely wraps
> > the related things, i.e, host:port, store names, topic partitions into an
> > Object that is easy to use. Further we could add some behaviour to this
> > class if we felt it necessary, i.e, hasStore(storeName) etc.
> >
> > Anyway, i'm interested in your thoughts.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 11 Jul 2016 at 13:47 Guozhang Wang  wrote:
> >
> >> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
> >>
> >> I 

[GitHub] kafka pull request #1611: MINOR: Check null in SmokeTestDriver to avoid NPE

2016-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1611


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-2946) DeleteTopic - protocol and server side implementation

2016-07-12 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-2946:
---
Status: Patch Available  (was: In Progress)

> DeleteTopic - protocol and server side implementation
> -
>
> Key: KAFKA-2946
> URL: https://issues.apache.org/jira/browse/KAFKA-2946
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-12 Thread Damian Guy
One more thing on the above, the methods on KafkaStreams should be changed
to something like:

Collection allMetadata()

Collection allMetadataForStore(final String storeName)

KafkaStreamsMetadata metadataWithKey(final String storeName,
final K key,
final Serializer
keySerializer)


Thanks,
Damian

On Tue, 12 Jul 2016 at 11:14 Damian Guy  wrote:

> Hi,
>
> I agree with point 1. application.server is a better name for the config
> (we'll change this). However, on point 2 I think we should stick mostly
> with what we already have. I've tried both ways of doing this when working
> on the JIRA and building examples and I find the current approach more
> intuitive and easier to use than the Map based approach.
> However, there is probably a naming issue. We should rename
> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very simple,
> but provides all the information a developer needs to be able to find the
> instance(s) of a Streams application that a particular store is running on,
> i.e.,
>
> public class KafkStreamsMetadata {
> private final HostInfo hostInfo;
> private final Set stateStoreNames;
> private final Set topicPartitions;
>
>
> So using the API to route to a new host is fairly simple, particularly in
> the case when you want to find the host for a particular key, i.e.,
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final KafkaStreamsMetadata streamsMetadata = 
> kafkaStreams.instanceWithKey("word-count", "hello", 
> Serdes.String().serializer());
> http.get("http://; + streamsMetadata.host() + ":" + streamsMetadata.port() + 
> "/get/word-count/hello");
>
>
> And if you want to do a scatter gather approach:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Collection kafkaStreamsMetadatas = 
> kafkaStreams.allInstancesWithStore("word-count");
> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> http.get("http://; + streamsMetadata.host() + ":" + 
> streamsMetadata.port() + "/get/word-count/hello");
> ...
> }
>
>
> And if you iterated over all instances:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Collection kafkaStreamsMetadatas = 
> kafkaStreams.allInstances();
> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> if (streamsMetadata.stateStoreNames().contains("word-count")) {
> http.get("http://; + streamsMetadata.host() + ":" + 
> streamsMetadata.port() + "/get/word-count/hello");
> ...
> }
> }
>
>
> If we were to change this to use Map for the
> most part users would need to iterate over the entry or key set. Examples:
>
> The finding an instance by key is a little odd:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Map streamsMetadata = 
> kafkaStreams.instanceWithKey("word-count","hello", 
> Serdes.String().serializer());
> // this is a bit odd as i only expect one:
> for (HostInfo hostInfo : streamsMetadata.keySet()) {
> http.get("http://; + streamsMetadata.host() + ":" + 
> streamsMetadata.port() + "/get/word-count/hello");
> }
>
>
> The scatter/gather by store is fairly similar to the previous example:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Map streamsMetadata = 
> kafkaStreams.allInstancesWithStore("word-count");
> for(HostInfo hostInfo : streamsMetadata.keySet()) {
> http.get("http://; + hostInfo.host() + ":" + hostInfo.port() + 
> "/get/word-count/hello");
> ...
> }
>
> And iterating over all instances:
>
> final Map streamsMetadata = 
> kafkaStreams.allInstances();
> for (Map.Entry entry : 
> streamsMetadata.entrySet()) {
> for (TaskMetadata taskMetadata : entry.getValue()) {
> if (taskMetadata.stateStoreNames().contains("word-count")) {
> http.get("http://; + streamsMetadata.host() + ":" + 
> streamsMetadata.port() + "/get/word-count/hello");
> ...
> }
> }
> }
>
>
> IMO - having a class we return is the better approach as it nicely wraps
> the related things, i.e, host:port, store names, topic partitions into an
> Object that is easy to use. Further we could add some behaviour to this
> class if we felt it necessary, i.e, hasStore(storeName) etc.
>
> Anyway, i'm interested in your thoughts.
>
> Thanks,
> Damian
>
> On Mon, 11 Jul 2016 at 13:47 Guozhang Wang  wrote:
>
>> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
>>
>> I agree with Neha that Kafka Streams can provide the bare minimum APIs
>> just
>> for host/port, and user's implemented layer can provide URL / proxy
>> address
>> they want to build on top of it.
>>
>>
>> 2. Re Improving KafkaStreamsInstance interface:
>>
>> Users are indeed aware of "TaskId" class which is not part of internal
>> packages and is exposed in PartitionGrouper interface that can be
>> 

[jira] [Commented] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373430#comment-15373430
 ] 

ASF GitHub Bot commented on KAFKA-2857:
---

Github user imandhan closed the pull request at:

https://github.com/apache/kafka/pull/1548


> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Ishita Mandhan
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373431#comment-15373431
 ] 

ASF GitHub Bot commented on KAFKA-2857:
---

GitHub user imandhan reopened a pull request:

https://github.com/apache/kafka/pull/1548

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

Added a check to make sure different cases when offset topic hasn't been 
created and consumer group describe command is run, are handled appropriately. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/imandhan/kafka KAFKA-2857

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1548.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1548


commit c27df5a17a6bbb34a6118bb7b74d6f3e80239612
Author: Ishita Mandhan 
Date:   2016-06-23T23:46:07Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

Added a check to make sure different cases when offset topic hasn't been 
created and consumer group describe command is run, are handled appropriately.

commit 9da78fa08688b691a954da164e6c8d28abc90500
Author: Ishita Mandhan 
Date:   2016-06-24T00:41:12Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

Added a check to make sure different cases when offset topic hasn't been 
created and consumer group describe command is run, are handled appropriately.

commit 8d157546152fc063a2aff92a3fade1b6947b5ffb
Author: Ishita Mandhan 
Date:   2016-06-24T00:51:20Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

commit f50c8dce19018e7f689d318795eb797a7d0d0f2d
Author: Ishita Mandhan 
Date:   2016-06-28T00:53:33Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

commit 7815008eeb4c2dabaf96707652f50425ac4d5923
Author: Ishita Mandhan 
Date:   2016-06-28T20:13:51Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

commit 2f2eca8091ef576e2f9c79859c7404b8d50733d7
Author: Ishita Mandhan 
Date:   2016-07-11T22:53:35Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created




> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Ishita Mandhan
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1548: KAFKA-2857 ConsumerGroupCommand throws GroupCoordi...

2016-07-12 Thread imandhan
GitHub user imandhan reopened a pull request:

https://github.com/apache/kafka/pull/1548

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

Added a check to make sure different cases when offset topic hasn't been 
created and consumer group describe command is run, are handled appropriately. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/imandhan/kafka KAFKA-2857

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1548.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1548


commit c27df5a17a6bbb34a6118bb7b74d6f3e80239612
Author: Ishita Mandhan 
Date:   2016-06-23T23:46:07Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

Added a check to make sure different cases when offset topic hasn't been 
created and consumer group describe command is run, are handled appropriately.

commit 9da78fa08688b691a954da164e6c8d28abc90500
Author: Ishita Mandhan 
Date:   2016-06-24T00:41:12Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

Added a check to make sure different cases when offset topic hasn't been 
created and consumer group describe command is run, are handled appropriately.

commit 8d157546152fc063a2aff92a3fade1b6947b5ffb
Author: Ishita Mandhan 
Date:   2016-06-24T00:51:20Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

commit f50c8dce19018e7f689d318795eb797a7d0d0f2d
Author: Ishita Mandhan 
Date:   2016-06-28T00:53:33Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

commit 7815008eeb4c2dabaf96707652f50425ac4d5923
Author: Ishita Mandhan 
Date:   2016-06-28T20:13:51Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created

commit 2f2eca8091ef576e2f9c79859c7404b8d50733d7
Author: Ishita Mandhan 
Date:   2016-07-11T22:53:35Z

KAFKA-2857 ConsumerGroupCommand throws 
GroupCoordinatorNotAvailableException when describing a non-existent group 
before the offset topic is created




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1548: KAFKA-2857 ConsumerGroupCommand throws GroupCoordi...

2016-07-12 Thread imandhan
Github user imandhan closed the pull request at:

https://github.com/apache/kafka/pull/1548


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2946) DeleteTopic - protocol and server side implementation

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373418#comment-15373418
 ] 

ASF GitHub Bot commented on KAFKA-2946:
---

GitHub user granthenke opened a pull request:

https://github.com/apache/kafka/pull/1616

KAFKA-2946: DeleteTopic - protocol and server side implementation



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka delete-wire-new

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1616


commit 6b1b6baf69a508d445ff587400902ac7dfa126a3
Author: Grant Henke 
Date:   2016-06-16T17:49:15Z

KAFKA-2946: DeleteTopic - protocol and server side implementation




> DeleteTopic - protocol and server side implementation
> -
>
> Key: KAFKA-2946
> URL: https://issues.apache.org/jira/browse/KAFKA-2946
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1616: KAFKA-2946: DeleteTopic - protocol and server side...

2016-07-12 Thread granthenke
GitHub user granthenke opened a pull request:

https://github.com/apache/kafka/pull/1616

KAFKA-2946: DeleteTopic - protocol and server side implementation



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka delete-wire-new

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1616


commit 6b1b6baf69a508d445ff587400902ac7dfa126a3
Author: Grant Henke 
Date:   2016-06-16T17:49:15Z

KAFKA-2946: DeleteTopic - protocol and server side implementation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1594: KAFKA-3931: Fix transient failures in pattern subs...

2016-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1594


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3931) kafka.api.PlaintextConsumerTest.testPatternUnsubscription transient failure

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373417#comment-15373417
 ] 

ASF GitHub Bot commented on KAFKA-3931:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1594


> kafka.api.PlaintextConsumerTest.testPatternUnsubscription transient failure
> ---
>
> Key: KAFKA-3931
> URL: https://issues.apache.org/jira/browse/KAFKA-3931
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
> Fix For: 0.10.0.1
>
>
> Some of the recent builds are failing this test 
> ([example|https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4579/testReport/kafka.api/PlaintextConsumerTest/testPatternUnsubscription/]).
> Some other are failing on 
> kafka.api.PlaintextConsumerTest.testPatternSubscription 
> ([example|https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4565/testReport/junit/kafka.api/PlaintextConsumerTest/testPatternSubscription/])
> These failures seem to have started after [this 
> commit|https://github.com/apache/kafka/commit/d7de59a579af5ba4ecb1aec8fed84054f8b86443].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3931) kafka.api.PlaintextConsumerTest.testPatternUnsubscription transient failure

2016-07-12 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3931:
-
   Resolution: Fixed
Fix Version/s: 0.10.0.1
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 1594
[https://github.com/apache/kafka/pull/1594]

> kafka.api.PlaintextConsumerTest.testPatternUnsubscription transient failure
> ---
>
> Key: KAFKA-3931
> URL: https://issues.apache.org/jira/browse/KAFKA-3931
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
> Fix For: 0.10.0.1
>
>
> Some of the recent builds are failing this test 
> ([example|https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4579/testReport/kafka.api/PlaintextConsumerTest/testPatternUnsubscription/]).
> Some other are failing on 
> kafka.api.PlaintextConsumerTest.testPatternSubscription 
> ([example|https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4565/testReport/junit/kafka.api/PlaintextConsumerTest/testPatternSubscription/])
> These failures seem to have started after [this 
> commit|https://github.com/apache/kafka/commit/d7de59a579af5ba4ecb1aec8fed84054f8b86443].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-12 Thread Damian Guy
Hi,

I agree with point 1. application.server is a better name for the config
(we'll change this). However, on point 2 I think we should stick mostly
with what we already have. I've tried both ways of doing this when working
on the JIRA and building examples and I find the current approach more
intuitive and easier to use than the Map based approach.
However, there is probably a naming issue. We should rename
KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very simple,
but provides all the information a developer needs to be able to find the
instance(s) of a Streams application that a particular store is running on,
i.e.,

public class KafkStreamsMetadata {
private final HostInfo hostInfo;
private final Set stateStoreNames;
private final Set topicPartitions;


So using the API to route to a new host is fairly simple, particularly in
the case when you want to find the host for a particular key, i.e.,

final KafkaStreams kafkaStreams = createKafkaStreams();
final KafkaStreamsMetadata streamsMetadata =
kafkaStreams.instanceWithKey("word-count", "hello",
Serdes.String().serializer());
http.get("http://; + streamsMetadata.host() + ":" +
streamsMetadata.port() + "/get/word-count/hello");


And if you want to do a scatter gather approach:

final KafkaStreams kafkaStreams = createKafkaStreams();
final Collection kafkaStreamsMetadatas =
kafkaStreams.allInstancesWithStore("word-count");
for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
http.get("http://; + streamsMetadata.host() + ":" +
streamsMetadata.port() + "/get/word-count/hello");
...
}


And if you iterated over all instances:

final KafkaStreams kafkaStreams = createKafkaStreams();
final Collection kafkaStreamsMetadatas =
kafkaStreams.allInstances();
for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
if (streamsMetadata.stateStoreNames().contains("word-count")) {
http.get("http://; + streamsMetadata.host() + ":" +
streamsMetadata.port() + "/get/word-count/hello");
...
}
}


If we were to change this to use Map for the
most part users would need to iterate over the entry or key set. Examples:

The finding an instance by key is a little odd:

final KafkaStreams kafkaStreams = createKafkaStreams();
final Map streamsMetadata =
kafkaStreams.instanceWithKey("word-count","hello",
Serdes.String().serializer());
// this is a bit odd as i only expect one:
for (HostInfo hostInfo : streamsMetadata.keySet()) {
http.get("http://; + streamsMetadata.host() + ":" +
streamsMetadata.port() + "/get/word-count/hello");
}


The scatter/gather by store is fairly similar to the previous example:

final KafkaStreams kafkaStreams = createKafkaStreams();
final Map streamsMetadata =
kafkaStreams.allInstancesWithStore("word-count");
for(HostInfo hostInfo : streamsMetadata.keySet()) {
http.get("http://; + hostInfo.host() + ":" + hostInfo.port() +
"/get/word-count/hello");
...
}

And iterating over all instances:

final Map streamsMetadata =
kafkaStreams.allInstances();
for (Map.Entry entry :
streamsMetadata.entrySet()) {
for (TaskMetadata taskMetadata : entry.getValue()) {
if (taskMetadata.stateStoreNames().contains("word-count")) {
http.get("http://; + streamsMetadata.host() + ":" +
streamsMetadata.port() + "/get/word-count/hello");
...
}
}
}


IMO - having a class we return is the better approach as it nicely wraps
the related things, i.e, host:port, store names, topic partitions into an
Object that is easy to use. Further we could add some behaviour to this
class if we felt it necessary, i.e, hasStore(storeName) etc.

Anyway, i'm interested in your thoughts.

Thanks,
Damian

On Mon, 11 Jul 2016 at 13:47 Guozhang Wang  wrote:

> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
>
> I agree with Neha that Kafka Streams can provide the bare minimum APIs just
> for host/port, and user's implemented layer can provide URL / proxy address
> they want to build on top of it.
>
>
> 2. Re Improving KafkaStreamsInstance interface:
>
> Users are indeed aware of "TaskId" class which is not part of internal
> packages and is exposed in PartitionGrouper interface that can be
> instantiated by the users, which is assigned with input topic partitions.
> So we can probably change the APIs as:
>
> Map KafkaStreams.getAllTasks() where
> TaskMetadata has fields such as taskId, list of assigned partitions, list
> of state store names; and HostState can include hostname / port. The port
> is the listening port of a user-defined listener that users provide to
> listen for queries (e.g., using REST APIs).
>
> Map KafkaStreams.getTasksWithStore(String /*
> storeName */) would return only the hosts and their assigned tasks if at
> least one of the tasks include the given store name.
>
> Map 

[jira] [Commented] (KAFKA-3950) kafka mirror maker tool is not respecting whitelist option

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373307#comment-15373307
 ] 

ASF GitHub Bot commented on KAFKA-3950:
---

GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/1615

KAFKA-3950: kafka mirror maker tool is not respecting whitelist option



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/omkreddy/kafka KAFKA-3950-CONSUMER-PATTREN

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1615.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1615


commit f2a6e806c8c3d181dc3d16ab90548bdffde404af
Author: Manikumar Reddy O 
Date:   2016-07-12T17:29:46Z

KAFKA-3950: kafka mirror maker tool is not respecting whitelist option




> kafka mirror maker tool is not respecting whitelist option
> --
>
> Key: KAFKA-3950
> URL: https://issues.apache.org/jira/browse/KAFKA-3950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raghav Kumar Gautam
>Assignee: Manikumar Reddy
>Priority: Critical
>
> A mirror maker launched like this:
> {code}
> /usr/bin/kinit -k -t /home/kfktest/hadoopqa/keytabs/kfktest.headless.keytab 
> kfkt...@example.com
> JAVA_HOME=/usr/jdk64/jdk1.8.0_77 JMX_PORT=9112 
> /usr/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_consumer_12.properties
>  --producer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_producer_12.properties
>  --new.consumer --whitelist="test.*" >>  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/mirror_maker_12.log
>  2>&1 & echo pid:$! >  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/entity_12_pid
> {code}
> Lead to TopicAuthorizationException:
> {code}
> WARN Error while fetching metadata with correlation id 44 : 
> {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED} 
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-20 13:24:49,983] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [__consumer_offsets]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1615: KAFKA-3950: kafka mirror maker tool is not respect...

2016-07-12 Thread omkreddy
GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/1615

KAFKA-3950: kafka mirror maker tool is not respecting whitelist option



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/omkreddy/kafka KAFKA-3950-CONSUMER-PATTREN

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1615.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1615


commit f2a6e806c8c3d181dc3d16ab90548bdffde404af
Author: Manikumar Reddy O 
Date:   2016-07-12T17:29:46Z

KAFKA-3950: kafka mirror maker tool is not respecting whitelist option




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2016-07-12 Thread Ravi Peri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373244#comment-15373244
 ] 

Ravi Peri commented on KAFKA-1194:
--

Sriharsha,

I will send out a PR with patch request today.

Thanks
r


> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1
> Environment: window
>Reporter: Tao Qin
>Assignee: Jay Kreps
>  Labels: features, patch
> Fix For: 0.10.1.0
>
> Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, 
> kafka-1194-v2.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3957) consumer timeout not being respected when kafka broker is not available

2016-07-12 Thread Vincent Fumo (JIRA)
Vincent Fumo created KAFKA-3957:
---

 Summary: consumer timeout not being respected when kafka broker is 
not available
 Key: KAFKA-3957
 URL: https://issues.apache.org/jira/browse/KAFKA-3957
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.9.0.1
Reporter: Vincent Fumo
Priority: Minor


KafkaConsumer v0.9::

I have a consumer set up with session.timeout.ms set to 30s. I make a call like

consumer.poll(1)

but if the kafka broker is down, that call will hang indefinitely.

Digging into the code it seems that the timeout isn't respected:

KafkaConsumer calls out to pollOnce() as seen below::

   private Map>> pollOnce(long 
timeout) {
   // TODO: Sub-requests should take into account the poll timeout 
(KAFKA-1894)
   coordinator.ensureCoordinatorKnown();

   // ensure we have partitions assigned if we expect to
   if (subscriptions.partitionsAutoAssigned())
   coordinator.ensurePartitionAssignment();

   // fetch positions if we have partitions we're subscribed to that we
   // don't know the offset for
   if (!subscriptions.hasAllFetchPositions())
   updateFetchPositions(this.subscriptions.missingFetchPositions());

   // init any new fetches (won't resend pending fetches)
   Cluster cluster = this.metadata.fetch();
   Map>> records = 
fetcher.fetchedRecords();

   // if data is available already, e.g. from a previous network client 
poll() call to commit,
   // then just return it immediately
   if (!records.isEmpty()) {
   return records;
   }

   fetcher.initFetches(cluster);
   client.poll(timeout);
   return fetcher.fetchedRecords();
   }

and we see that we stick on the call to coordinator.ensureCoordinatorKnown();

AbstractCoordinator ::

   public void ensureCoordinatorKnown() {
   while (coordinatorUnknown()) {
   RequestFuture future = sendGroupMetadataRequest();
   client.poll(future);

   if (future.failed()) {
   if (future.isRetriable())
   client.awaitMetadataUpdate();
   else
   throw future.exception();
   }
   }
   }

in this case the Future fails (since the broker is down) and then a call to 
client.awaitMetadataUpdate() is made which in the case of the 
ConsumerNetworkClient will block forever :

   public void awaitMetadataUpdate() {
   int version = this.metadata.requestUpdate();
   do {
   poll(Long.MAX_VALUE);
   } while (this.metadata.version() == version);
   }


I feel that this is a bug. When you set a timeout on a call to a blocking 
method, that timeout should be respected and an exception should be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3252) compression type for a topic should be used during log compaction

2016-07-12 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373182#comment-15373182
 ] 

Ashish K Singh commented on KAFKA-3252:
---

[~junrao] [~omkreddy] what should be the expected behavior in case 
re-compressed message exceed max message size? Waiting on feedback 
[here|https://github.com/apache/kafka/pull/1010#discussion_r55396575].

> compression type for a topic should be used during log compaction 
> --
>
> Key: KAFKA-3252
> URL: https://issues.apache.org/jira/browse/KAFKA-3252
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Ashish K Singh
> Fix For: 0.10.1.0
>
>
> Currently, the broker uses the specified compression type in a topic for 
> newly published messages. However, during log compaction, it still uses the 
> compression codec in the original message. To be consistent, it seems that we 
> should use the compression type in a topic when copying the messages to new 
> log segments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #749

2016-07-12 Thread Apache Jenkins Server
See 

Changes:

[junrao] KAFKA-2945; CreateTopic - protocol and server side implementation

--
[...truncated 1333 lines...]
kafka.server.EdgeCaseRequestTest > testInvalidApiVersionRequest PASSED

kafka.server.EdgeCaseRequestTest > testMalformedHeaderRequest STARTED

kafka.server.EdgeCaseRequestTest > testMalformedHeaderRequest PASSED

kafka.server.EdgeCaseRequestTest > testProduceRequestWithNullClientId STARTED

kafka.server.EdgeCaseRequestTest > testProduceRequestWithNullClientId PASSED

kafka.server.EdgeCaseRequestTest > testInvalidApiKeyRequest STARTED

kafka.server.EdgeCaseRequestTest > testInvalidApiKeyRequest PASSED

kafka.server.EdgeCaseRequestTest > testHeaderOnlyRequest STARTED

kafka.server.EdgeCaseRequestTest > testHeaderOnlyRequest PASSED

kafka.server.CreateTopicsRequestTest > testValidCreateTopicsRequests STARTED

kafka.server.CreateTopicsRequestTest > testValidCreateTopicsRequests PASSED

kafka.server.CreateTopicsRequestTest > testErrorCreateTopicsRequests STARTED

kafka.server.CreateTopicsRequestTest > testErrorCreateTopicsRequests PASSED

kafka.server.CreateTopicsRequestTest > testInvalidCreateTopicsRequests STARTED

kafka.server.CreateTopicsRequestTest > testInvalidCreateTopicsRequests PASSED

kafka.server.CreateTopicsRequestTest > testNotController STARTED

kafka.server.CreateTopicsRequestTest > testNotController PASSED

kafka.server.OffsetCommitTest > testUpdateOffsets STARTED

kafka.server.OffsetCommitTest > testUpdateOffsets PASSED

kafka.server.OffsetCommitTest > testLargeMetadataPayload STARTED

kafka.server.OffsetCommitTest > testLargeMetadataPayload PASSED

kafka.server.OffsetCommitTest > testCommitAndFetchOffsets STARTED

kafka.server.OffsetCommitTest > testCommitAndFetchOffsets PASSED

kafka.server.OffsetCommitTest > testOffsetExpiration STARTED

kafka.server.OffsetCommitTest > testOffsetExpiration PASSED

kafka.server.OffsetCommitTest > testNonExistingTopicOffsetCommit STARTED

kafka.server.OffsetCommitTest > testNonExistingTopicOffsetCommit PASSED

kafka.server.ReplicaManagerTest > testHighWaterMarkDirectoryMapping STARTED

kafka.server.ReplicaManagerTest > testHighWaterMarkDirectoryMapping PASSED

kafka.server.ReplicaManagerTest > 
testFetchBeyondHighWatermarkReturnEmptyResponse STARTED

kafka.server.ReplicaManagerTest > 
testFetchBeyondHighWatermarkReturnEmptyResponse PASSED

kafka.server.ReplicaManagerTest > testIllegalRequiredAcks STARTED

kafka.server.ReplicaManagerTest > testIllegalRequiredAcks PASSED

kafka.server.ReplicaManagerTest > testClearPurgatoryOnBecomingFollower STARTED

kafka.server.ReplicaManagerTest > testClearPurgatoryOnBecomingFollower PASSED

kafka.server.ReplicaManagerTest > testHighwaterMarkRelativeDirectoryMapping 
STARTED

kafka.server.ReplicaManagerTest > testHighwaterMarkRelativeDirectoryMapping 
PASSED

kafka.server.IsrExpirationTest > testIsrExpirationForSlowFollowers STARTED

kafka.server.IsrExpirationTest > testIsrExpirationForSlowFollowers PASSED

kafka.server.IsrExpirationTest > testIsrExpirationForStuckFollowers STARTED

kafka.server.IsrExpirationTest > testIsrExpirationForStuckFollowers PASSED

kafka.server.IsrExpirationTest > testIsrExpirationIfNoFetchRequestMade STARTED

kafka.server.IsrExpirationTest > testIsrExpirationIfNoFetchRequestMade PASSED

kafka.server.ControlledShutdownLeaderSelectorTest > testSelectLeader STARTED

kafka.server.ControlledShutdownLeaderSelectorTest > testSelectLeader PASSED

kafka.server.ClientQuotaManagerTest > testQuotaViolation STARTED

kafka.server.ClientQuotaManagerTest > testQuotaViolation PASSED

kafka.server.ClientQuotaManagerTest > testExpireQuotaSensors STARTED

kafka.server.ClientQuotaManagerTest > testExpireQuotaSensors PASSED

kafka.server.ClientQuotaManagerTest > testExpireThrottleTimeSensor STARTED

kafka.server.ClientQuotaManagerTest > testExpireThrottleTimeSensor PASSED

kafka.server.ClientQuotaManagerTest > testQuotaParsing STARTED

kafka.server.ClientQuotaManagerTest > testQuotaParsing PASSED

kafka.server.MetadataCacheTest > 
getTopicMetadataWithNonSupportedSecurityProtocol STARTED

kafka.server.MetadataCacheTest > 
getTopicMetadataWithNonSupportedSecurityProtocol PASSED

kafka.server.MetadataCacheTest > getTopicMetadataIsrNotAvailable STARTED

kafka.server.MetadataCacheTest > getTopicMetadataIsrNotAvailable PASSED

kafka.server.MetadataCacheTest > getTopicMetadata STARTED

kafka.server.MetadataCacheTest > getTopicMetadata PASSED

kafka.server.MetadataCacheTest > getTopicMetadataReplicaNotAvailable STARTED

kafka.server.MetadataCacheTest > getTopicMetadataReplicaNotAvailable PASSED

kafka.server.MetadataCacheTest > getTopicMetadataPartitionLeaderNotAvailable 
STARTED

kafka.server.MetadataCacheTest > getTopicMetadataPartitionLeaderNotAvailable 
PASSED

kafka.server.MetadataCacheTest > getAliveBrokersShouldNotBeMutatedByUpdateCache 
STARTED

kafka.server.MetadataCacheTest > 

Build failed in Jenkins: kafka-trunk-jdk7 #1418

2016-07-12 Thread Apache Jenkins Server
See 

Changes:

[junrao] KAFKA-2945; CreateTopic - protocol and server side implementation

--
[...truncated 3341 lines...]

kafka.message.ByteBufferMessageSetTest > testValidBytes PASSED

kafka.message.ByteBufferMessageSetTest > testValidBytesWithCompression STARTED

kafka.message.ByteBufferMessageSetTest > testValidBytesWithCompression PASSED

kafka.message.ByteBufferMessageSetTest > 
testOffsetAssignmentAfterMessageFormatConversion STARTED

kafka.message.ByteBufferMessageSetTest > 
testOffsetAssignmentAfterMessageFormatConversion PASSED

kafka.message.ByteBufferMessageSetTest > testIteratorIsConsistent STARTED

kafka.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.message.ByteBufferMessageSetTest > testAbsoluteOffsetAssignment STARTED

kafka.message.ByteBufferMessageSetTest > testAbsoluteOffsetAssignment PASSED

kafka.message.ByteBufferMessageSetTest > testCreateTime STARTED

kafka.message.ByteBufferMessageSetTest > testCreateTime PASSED

kafka.message.ByteBufferMessageSetTest > testInvalidCreateTime STARTED

kafka.message.ByteBufferMessageSetTest > testInvalidCreateTime PASSED

kafka.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.message.ByteBufferMessageSetTest > testLogAppendTime STARTED

kafka.message.ByteBufferMessageSetTest > testLogAppendTime PASSED

kafka.message.ByteBufferMessageSetTest > testWriteTo STARTED

kafka.message.ByteBufferMessageSetTest > testWriteTo PASSED

kafka.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.message.ByteBufferMessageSetTest > testIterator STARTED

kafka.message.ByteBufferMessageSetTest > testIterator PASSED

kafka.message.ByteBufferMessageSetTest > testRelativeOffsetAssignment STARTED

kafka.message.ByteBufferMessageSetTest > testRelativeOffsetAssignment PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > simpleRequest STARTED

kafka.network.SocketServerTest > simpleRequest PASSED

kafka.network.SocketServerTest > testSessionPrincipal STARTED

kafka.network.SocketServerTest > testSessionPrincipal PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides PASSED

kafka.network.SocketServerTest > testSocketsCloseOnShutdown STARTED


[jira] [Commented] (KAFKA-3956) MockProducer.send() sends a message before completeNext() or errorNext() are called

2016-07-12 Thread Gavin Manning (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373134#comment-15373134
 ] 

Gavin Manning commented on KAFKA-3956:
--

{{monospaced}}
{noformat}
@Test
public void testErrorNextAfterSend() {
final MockProducer mockProducer = new 
MockProducer<>(false, new StringSerializer(), new ByteArraySerializer());
byte[] bytes = new byte[10];
final ProducerRecord record = new 
ProducerRecord<>("topic", "key", bytes);
Assert.assertTrue(mockProducer.history().isEmpty());
mockProducer.send(record);
// History should be empty ... but isn't
Assert.assertTrue(mockProducer.history().isEmpty());
// Cause the next operation to fail
Assert.assertTrue(mockProducer.errorNext(new RuntimeException("Force an 
error")));
System.out.println("Mock producer contains " + 
mockProducer.history().size() + " message(s)");
// We're too late ... the message has already been "sent"
Assert.assertTrue(mockProducer.history().isEmpty());
}
{noformat}
{{monospaced}}

> MockProducer.send() sends a message before completeNext() or errorNext() are 
> called
> ---
>
> Key: KAFKA-3956
> URL: https://issues.apache.org/jira/browse/KAFKA-3956
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.0.0
> Environment: JUnit test
>Reporter: Gavin Manning
>
> I pass false for autoComplete to MockProducer's constructor.
> I then call MockProducer.send().
> MockProducer.history() now contains that message.
> I would expect completeNext() to cause the message to be added to history and 
> errorNext() to *not* add the message to history.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3956) MockProducer.send() sends a message before completeNext() or errorNext() are called

2016-07-12 Thread Gavin Manning (JIRA)
Gavin Manning created KAFKA-3956:


 Summary: MockProducer.send() sends a message before completeNext() 
or errorNext() are called
 Key: KAFKA-3956
 URL: https://issues.apache.org/jira/browse/KAFKA-3956
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.10.0.0
 Environment: JUnit test
Reporter: Gavin Manning


I pass false for autoComplete to MockProducer's constructor.
I then call MockProducer.send().
MockProducer.history() now contains that message.

I would expect completeNext() to cause the message to be added to history and 
errorNext() to *not* add the message to history.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3950) kafka mirror maker tool is not respecting whitelist option

2016-07-12 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373115#comment-15373115
 ] 

Manikumar Reddy commented on KAFKA-3950:


I was also thinking similar lines. Instead of removing the check, we can move 
it to else block and have separate check for pattern subscriptions (check below 
pseudo code).  with this, we can still  still throw error for  topics that 
meant to include in your regex and ignore other unauthorized topics.

I have a patch. I will test and raise PR.

{code}
if (subscriptions.hasPatternSubscription()) {

for (String topic : cluster.unauthorizedTopics()) {
if 
(subscriptions.getSubscribedPattern().matcher(topic).matches())
throw new TopicAuthorizationException(topic);
}

   .
   .
} else if (!cluster.unauthorizedTopics().isEmpty()) {
throw new TopicAuthorizationException(new 
HashSet<>(cluster.unauthorizedTopics()));
}
{code}

> kafka mirror maker tool is not respecting whitelist option
> --
>
> Key: KAFKA-3950
> URL: https://issues.apache.org/jira/browse/KAFKA-3950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raghav Kumar Gautam
>Assignee: Manikumar Reddy
>Priority: Critical
>
> A mirror maker launched like this:
> {code}
> /usr/bin/kinit -k -t /home/kfktest/hadoopqa/keytabs/kfktest.headless.keytab 
> kfkt...@example.com
> JAVA_HOME=/usr/jdk64/jdk1.8.0_77 JMX_PORT=9112 
> /usr/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_consumer_12.properties
>  --producer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_producer_12.properties
>  --new.consumer --whitelist="test.*" >>  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/mirror_maker_12.log
>  2>&1 & echo pid:$! >  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/entity_12_pid
> {code}
> Lead to TopicAuthorizationException:
> {code}
> WARN Error while fetching metadata with correlation id 44 : 
> {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED} 
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-20 13:24:49,983] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [__consumer_offsets]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3952) VerifyConsumerRebalance cannot succeed when checking partition owner

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373067#comment-15373067
 ] 

ASF GitHub Bot commented on KAFKA-3952:
---

Github user swwl1992 closed the pull request at:

https://github.com/apache/kafka/pull/1612


> VerifyConsumerRebalance cannot succeed when checking partition owner
> 
>
> Key: KAFKA-3952
> URL: https://issues.apache.org/jira/browse/KAFKA-3952
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.0, 0.9.0.0, 0.10.0.0
> Environment: Linux, Scala 2.10
>Reporter: Simon Wan Wenli
>Priority: Minor
> Fix For: 0.10.1.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> At line 129 of VerifyConsumerRebalance.scala on trunk branch, the condition 
> never succeed due to type mismatch. As a result, the result of consumer 
> rebalance verification is always failed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3952) VerifyConsumerRebalance cannot succeed when checking partition owner

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373068#comment-15373068
 ] 

ASF GitHub Bot commented on KAFKA-3952:
---

GitHub user swwl1992 reopened a pull request:

https://github.com/apache/kafka/pull/1612

KAFKA-3952: Consumer rebalance verifier never succeed due to type mismatch



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/swwl1992/kafka 
ticket-KAFKA-3952-fix-consumer-rebalance-verifier

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1612


commit 1ae7fee68319bdc32dee8d80e78bb037d93a7e29
Author: Wan Wenli 
Date:   2016-07-12T02:54:24Z

fix logic bug in consumer rebalance verifier




> VerifyConsumerRebalance cannot succeed when checking partition owner
> 
>
> Key: KAFKA-3952
> URL: https://issues.apache.org/jira/browse/KAFKA-3952
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.0, 0.9.0.0, 0.10.0.0
> Environment: Linux, Scala 2.10
>Reporter: Simon Wan Wenli
>Priority: Minor
> Fix For: 0.10.1.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> At line 129 of VerifyConsumerRebalance.scala on trunk branch, the condition 
> never succeed due to type mismatch. As a result, the result of consumer 
> rebalance verification is always failed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1612: KAFKA-3952: Consumer rebalance verifier never succ...

2016-07-12 Thread swwl1992
GitHub user swwl1992 reopened a pull request:

https://github.com/apache/kafka/pull/1612

KAFKA-3952: Consumer rebalance verifier never succeed due to type mismatch



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/swwl1992/kafka 
ticket-KAFKA-3952-fix-consumer-rebalance-verifier

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1612


commit 1ae7fee68319bdc32dee8d80e78bb037d93a7e29
Author: Wan Wenli 
Date:   2016-07-12T02:54:24Z

fix logic bug in consumer rebalance verifier




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1612: KAFKA-3952: Consumer rebalance verifier never succ...

2016-07-12 Thread swwl1992
Github user swwl1992 closed the pull request at:

https://github.com/apache/kafka/pull/1612


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2016-07-12 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373066#comment-15373066
 ] 

Sriharsha Chintalapani commented on KAFKA-1194:
---

[~rperi] any update on this.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1
> Environment: window
>Reporter: Tao Qin
>Assignee: Jay Kreps
>  Labels: features, patch
> Fix For: 0.10.1.0
>
> Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, 
> kafka-1194-v2.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2945) CreateTopic - protocol and server side implementation

2016-07-12 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2945:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 1489
[https://github.com/apache/kafka/pull/1489]

> CreateTopic - protocol and server side implementation
> -
>
> Key: KAFKA-2945
> URL: https://issues.apache.org/jira/browse/KAFKA-2945
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2945) CreateTopic - protocol and server side implementation

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373060#comment-15373060
 ] 

ASF GitHub Bot commented on KAFKA-2945:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1489


> CreateTopic - protocol and server side implementation
> -
>
> Key: KAFKA-2945
> URL: https://issues.apache.org/jira/browse/KAFKA-2945
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1489: KAFKA-2945: CreateTopic - protocol and server side...

2016-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1489


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3950) kafka mirror maker tool is not respecting whitelist option

2016-07-12 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372991#comment-15372991
 ] 

Ismael Juma commented on KAFKA-3950:


My point is that if you are missing the authorization for a topic that you 
meant to include in your regex, you won't get an error. I think that's OK, but 
something that is important to call out.

> kafka mirror maker tool is not respecting whitelist option
> --
>
> Key: KAFKA-3950
> URL: https://issues.apache.org/jira/browse/KAFKA-3950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raghav Kumar Gautam
>Assignee: Manikumar Reddy
>Priority: Critical
>
> A mirror maker launched like this:
> {code}
> /usr/bin/kinit -k -t /home/kfktest/hadoopqa/keytabs/kfktest.headless.keytab 
> kfkt...@example.com
> JAVA_HOME=/usr/jdk64/jdk1.8.0_77 JMX_PORT=9112 
> /usr/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_consumer_12.properties
>  --producer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_producer_12.properties
>  --new.consumer --whitelist="test.*" >>  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/mirror_maker_12.log
>  2>&1 & echo pid:$! >  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/entity_12_pid
> {code}
> Lead to TopicAuthorizationException:
> {code}
> WARN Error while fetching metadata with correlation id 44 : 
> {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED} 
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-20 13:24:49,983] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [__consumer_offsets]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3950) kafka mirror maker tool is not respecting whitelist option

2016-07-12 Thread TAO XIAO (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372982#comment-15372982
 ] 

TAO XIAO commented on KAFKA-3950:
-

As only the topics that the users have permission to read are returned I think 
we can consider this is a authorization success. And we can throw exception if 
none of topics matched.

> kafka mirror maker tool is not respecting whitelist option
> --
>
> Key: KAFKA-3950
> URL: https://issues.apache.org/jira/browse/KAFKA-3950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raghav Kumar Gautam
>Assignee: Manikumar Reddy
>Priority: Critical
>
> A mirror maker launched like this:
> {code}
> /usr/bin/kinit -k -t /home/kfktest/hadoopqa/keytabs/kfktest.headless.keytab 
> kfkt...@example.com
> JAVA_HOME=/usr/jdk64/jdk1.8.0_77 JMX_PORT=9112 
> /usr/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_consumer_12.properties
>  --producer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_producer_12.properties
>  --new.consumer --whitelist="test.*" >>  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/mirror_maker_12.log
>  2>&1 & echo pid:$! >  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/entity_12_pid
> {code}
> Lead to TopicAuthorizationException:
> {code}
> WARN Error while fetching metadata with correlation id 44 : 
> {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED} 
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-20 13:24:49,983] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [__consumer_offsets]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2016-07-12 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-3955:
---

 Summary: Kafka log recovery doesn't truncate logs on non-monotonic 
offsets, leading to failed broker boot
 Key: KAFKA-3955
 URL: https://issues.apache.org/jira/browse/KAFKA-3955
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.0, 0.9.0.1, 0.8.2.2, 0.9.0.0, 0.8.2.1, 0.8.2.0, 
0.8.1.1, 0.8.1, 0.8.0
Reporter: Tom Crayford


Hi,

I've found a bug impacting kafka brokers on startup after an unclean shutdown. 
If a log segment is corrupt and has non-monotonic offsets (see the appendix of 
this bug for a sample output from {{DumpLogSegments}}), then 
{{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218

That code is called by {{LogSegment.recover}}: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191

Which is called in several places in {{Log.scala}}. Notably it's called four 
times during recovery:

Thrice in Log.loadSegments
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226

and once in Log.recoverLog

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268

Of these, only the very last one has a {{catch}} for 
{{InvalidOffsetException}}. When that catches the issue, it truncates the whole 
log (not just this segment): 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
 to the start segment of the bad log segment.

However, this code can't be hit on recovery, because of the code paths in 
{{loadSegments}} - they mean we'll never hit truncation here, as we always 
throw this exception and that goes all the way to the toplevel exception 
handler and crashes the JVM.

As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
this is to move this crash recovery/truncate code inside a new method in 
{{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
That code should return the number of {{truncatedBytes}} like we do in 
{{Log.recoverLog}} and then truncate the log. The callers will have to be 
notified "stop iterating over files in the directory", likely via a return 
value of {{truncatedBytes}} like {{Log.recoverLog` does right now.

I'm happy working on a patch for this. I'm aware this recovery code is tricky 
and important to get right.

I'm also curious (and currently don't have good theories as of yet) as to how 
this log segment got into this state with non-monotonic offsets. This segment 
is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
recovery exists in trunk, but I'm unsure if the new handling around compressed 
messages (KIP-31) means the bug where non-monotonic offsets get appended is 
still present in trunk.

As a production workaround, one can manually truncate that log folder yourself 
(delete all .index/.log files including and after the one with the bad offset). 
However, kafka should (and can) handle this case well - with replication we can 
truncate in broker startup.

stacktrace and error message:

{code}
pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
/$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding 
index...
pri=ERROR t=main at=LogManager There was an error in one of the threads during 
logs loading: kafka.common.InvalidOffsetException: Attempt to append an offset 
(15000337) to position 111719 no larger than the last offset appended 
(15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. Prepare 
to shutdown kafka.common.InvalidOffsetException: Attempt to append an offset 
(15000337) to position 111719 no larger than the last offset appended 
(15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$With...
...Filter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at 

[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372882#comment-15372882
 ] 

ASF GitHub Bot commented on KAFKA-3933:
---

GitHub user tcrayford opened a pull request:

https://github.com/apache/kafka/pull/1614

KAFKA-3933: close deepIterator during log recovery

Avoids leaking native memory and hence crashing brokers on bootup due to
running out of memory.

Introduces `kafka.common.ClosableIterator`, which is an iterator that
can be closed, and changes the signature of
`ByteBufferMessageSet.deepIterator` to return it, then changes the
callers to always close the iterator.

This is a followup from https://github.com/apache/kafka/pull/1598 with more 
native memory leaks in the broker code found and fixed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/heroku/kafka dont_leak_native_memory

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1614


commit 7fa608371ff8e380350029f3ab1dcffeb0e26c73
Author: Tom Crayford 
Date:   2016-07-08T11:50:21Z

KAFKA-3933: close deepIterator during log recovery

Avoids leaking native memory and hence crashing brokers on bootup due to
running out of memory.

Introduces `kafka.common.ClosableIterator`, which is an iterator that
can be closed, and changes the signature of
`ByteBufferMessageSet.deepIterator` to return it, then changes the
callers to always close the iterator.




> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Assignee: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all 

[GitHub] kafka pull request #1614: KAFKA-3933: close deepIterator during log recovery

2016-07-12 Thread tcrayford
GitHub user tcrayford opened a pull request:

https://github.com/apache/kafka/pull/1614

KAFKA-3933: close deepIterator during log recovery

Avoids leaking native memory and hence crashing brokers on bootup due to
running out of memory.

Introduces `kafka.common.ClosableIterator`, which is an iterator that
can be closed, and changes the signature of
`ByteBufferMessageSet.deepIterator` to return it, then changes the
callers to always close the iterator.

This is a followup from https://github.com/apache/kafka/pull/1598 with more 
native memory leaks in the broker code found and fixed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/heroku/kafka dont_leak_native_memory

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1614


commit 7fa608371ff8e380350029f3ab1dcffeb0e26c73
Author: Tom Crayford 
Date:   2016-07-08T11:50:21Z

KAFKA-3933: close deepIterator during log recovery

Avoids leaking native memory and hence crashing brokers on bootup due to
running out of memory.

Introduces `kafka.common.ClosableIterator`, which is an iterator that
can be closed, and changes the signature of
`ByteBufferMessageSet.deepIterator` to return it, then changes the
callers to always close the iterator.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1464) Add a throttling option to the Kafka replication tool

2016-07-12 Thread Ralph Weires (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372870#comment-15372870
 ] 

Ralph Weires commented on KAFKA-1464:
-

Another related idea then, since those consumer rebalancing issues that result 
during maintenance for us drove me up the walls yesterday... Just desperately 
looking for a way to get this stabilized (on our v0.8.2.1) ;)

Wouldn't a (manual and temporary) modification of the partition assignment also 
be a viable option, to prevent a given node from becoming leader for any 
partitions?

I mean, could I issue kafka-reassign-partitions.sh with a customized partition 
assignment, that wouldn't actually re-assign any partitions to different 
brokers, but would merely change the replica *order* for several of the 
partitions - such that the node in question no longer is first replica for any 
partition? If I understand it right, the controller will always prefer the 
first replica as leader in balancing, so I'd just need to make sure that my 
node won't be the first replica for anything. All this temporarily of course, 
so after the maintenance I'd restore the original partition assignment back 
again.

Should this work, or would you expect specific problems with this workaround...?

Also: Let me know if this rather belongs onto the mailing list, since 
admittedly it isn't really related to throttling... But as a side-remark in 
this regard, I also tried throttling outside kafka (i.e. on side of the 
network, tried via wondershaper) in our problem case, but that didn't help. I'd 
agree this would need to be within kafka, i.e. to be able to separate 
out-of-sync replica recovery traffic from the rest.

> Add a throttling option to the Kafka replication tool
> -
>
> Key: KAFKA-1464
> URL: https://issues.apache.org/jira/browse/KAFKA-1464
> Project: Kafka
>  Issue Type: New Feature
>  Components: replication
>Affects Versions: 0.8.0
>Reporter: mjuarez
>Assignee: Ben Stopford
>Priority: Minor
>  Labels: replication, replication-tools
> Fix For: 0.10.1.0
>
>
> When performing replication on new nodes of a Kafka cluster, the replication 
> process will use all available resources to replicate as fast as possible.  
> This causes performance issues (mostly disk IO and sometimes network 
> bandwidth) when doing this in a production environment, in which you're 
> trying to serve downstream applications, at the same time you're performing 
> maintenance on the Kafka cluster.
> An option to throttle the replication to a specific rate (in either MB/s or 
> activities/second) would help production systems to better handle maintenance 
> tasks while still serving downstream applications.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2016-07-12 Thread Trevor Dodds (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372831#comment-15372831
 ] 

Trevor Dodds commented on KAFKA-1194:
-

I'm running 0.10.0.0 on windows and seeing the same error, this makes cleanup 
of old logs a big problem on busy systems.
Is there a patch available?

[2016-07-12 03:32:28,368] INFO Scheduling log segment 457963 for log 
beats-dev-3 for deletion. (kafka.log.Log)
[2016-07-12 03:32:28,368] ERROR Uncaught exception in scheduled task 
'kafka-log-retention' (kafka.utils.KafkaScheduler) 
kafka.common.KafkaStorageException: Failed to change the log file suffix from  
to .deleted for log segment 457963
at kafka.log.LogSegment.kafkaStorageException$1(LogSegment.scala:263)
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:265)
at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:832)
at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:823)
at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:579)
at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:579)
at scala.collection.immutable.List.foreach(List.scala:381)
at kafka.log.Log.deleteOldSegments(Log.scala:579)
at 
kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:427)
at 
kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:458)
at 
kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:456)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.LogManager.cleanupLogs(LogManager.scala:456)
at 
kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:192)
at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.file.FileSystemException: 
F:\ELK\kafka-logs\beats-dev-3\00457963.log -> 
F:\ELK\kafka-logs\beats-dev-3\00457963.log.deleted: The process 
cannot access the file because it is being used by another process.
at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:670)
at kafka.log.FileMessageSet.renameTo(FileMessageSet.scala:364)
... 27 more
Suppressed: java.nio.file.FileSystemException: 
F:\ELK\kafka-logs\beats-dev-3\00457963.log -> 
F:\ELK\kafka-logs\beats-dev-3\00457963.log.deleted: The process 
cannot access the file because it is being used byanother process.
at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:667)
... 28 more


> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>

[jira] [Comment Edited] (KAFKA-3950) kafka mirror maker tool is not respecting whitelist option

2016-07-12 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372740#comment-15372740
 ] 

Ismael Juma edited comment on KAFKA-3950 at 7/12/16 11:39 AM:
--

[~xiaotao183], that's a good point, we can perhaps change the semantics so that 
we apply the regex over the authorized topics only. The downside is that 
authorization errors will be harder to detect, but it seems to make sense.


was (Author: ijuma):
[~xiaotao183], that's a good point, we can perhaps change the semantics so that 
we apply the regex over the authorized topics only.

> kafka mirror maker tool is not respecting whitelist option
> --
>
> Key: KAFKA-3950
> URL: https://issues.apache.org/jira/browse/KAFKA-3950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raghav Kumar Gautam
>Assignee: Manikumar Reddy
>Priority: Critical
>
> A mirror maker launched like this:
> {code}
> /usr/bin/kinit -k -t /home/kfktest/hadoopqa/keytabs/kfktest.headless.keytab 
> kfkt...@example.com
> JAVA_HOME=/usr/jdk64/jdk1.8.0_77 JMX_PORT=9112 
> /usr/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_consumer_12.properties
>  --producer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_producer_12.properties
>  --new.consumer --whitelist="test.*" >>  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/mirror_maker_12.log
>  2>&1 & echo pid:$! >  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/entity_12_pid
> {code}
> Lead to TopicAuthorizationException:
> {code}
> WARN Error while fetching metadata with correlation id 44 : 
> {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED} 
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-20 13:24:49,983] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [__consumer_offsets]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3950) kafka mirror maker tool is not respecting whitelist option

2016-07-12 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372740#comment-15372740
 ] 

Ismael Juma commented on KAFKA-3950:


[~xiaotao183], that's a good point, we can perhaps change the semantics so that 
we apply the regex over the authorized topics only.

> kafka mirror maker tool is not respecting whitelist option
> --
>
> Key: KAFKA-3950
> URL: https://issues.apache.org/jira/browse/KAFKA-3950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raghav Kumar Gautam
>Assignee: Manikumar Reddy
>Priority: Critical
>
> A mirror maker launched like this:
> {code}
> /usr/bin/kinit -k -t /home/kfktest/hadoopqa/keytabs/kfktest.headless.keytab 
> kfkt...@example.com
> JAVA_HOME=/usr/jdk64/jdk1.8.0_77 JMX_PORT=9112 
> /usr/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_consumer_12.properties
>  --producer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_producer_12.properties
>  --new.consumer --whitelist="test.*" >>  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/mirror_maker_12.log
>  2>&1 & echo pid:$! >  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/entity_12_pid
> {code}
> Lead to TopicAuthorizationException:
> {code}
> WARN Error while fetching metadata with correlation id 44 : 
> {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED} 
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-20 13:24:49,983] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [__consumer_offsets]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-12 Thread Michael Noll
> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
>
> I agree with Neha that Kafka Streams can provide the bare minimum APIs
just
> for host/port, and user's implemented layer can provide URL / proxy
address
> they want to build on top of it.

In this case I second Neha's suggestion to give that parameter a better
name. "application.server" would indeed be consistent with
"bootstrap.servers", which accepts host:port pair(s).



On Mon, Jul 11, 2016 at 10:47 PM, Guozhang Wang  wrote:

> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
>
> I agree with Neha that Kafka Streams can provide the bare minimum APIs just
> for host/port, and user's implemented layer can provide URL / proxy address
> they want to build on top of it.
>
>
> 2. Re Improving KafkaStreamsInstance interface:
>
> Users are indeed aware of "TaskId" class which is not part of internal
> packages and is exposed in PartitionGrouper interface that can be
> instantiated by the users, which is assigned with input topic partitions.
> So we can probably change the APIs as:
>
> Map KafkaStreams.getAllTasks() where
> TaskMetadata has fields such as taskId, list of assigned partitions, list
> of state store names; and HostState can include hostname / port. The port
> is the listening port of a user-defined listener that users provide to
> listen for queries (e.g., using REST APIs).
>
> Map KafkaStreams.getTasksWithStore(String /*
> storeName */) would return only the hosts and their assigned tasks if at
> least one of the tasks include the given store name.
>
> Map KafkaStreams.getTaskWithStoreAndKey(Key
> k, String /* storeName */, StreamPartitioner partitioner) would return only
> the host and their assigned task if the store with the store name has a
> particular key, according to the partitioner behavior.
>
>
>
> Guozhang
>
>
> On Sun, Jul 10, 2016 at 11:21 AM, Neha Narkhede  wrote:
>
> > Few thoughts that became apparent after observing example code of what an
> > application architecture and code might look like with these changes.
> > Apologize for the late realization hence.
> >
> > 1. "user.endpoint" will be very differently defined for respective
> > applications. I don't think Kafka Streams should generalize to accept any
> > connection URL as we expect to only expose metadata expressed as HostInfo
> > (which is defined by host & port) and hence need to interpret the
> > "user.endpoint" as host & port. Applications will have their own endpoint
> > configs that will take many forms and they will be responsible for
> parsing
> > out host and port and configuring Kafka Streams accordingly.
> >
> > If we are in fact limiting to host and port, I wonder if we should change
> > the name of "user.endpoint" into something more specific. We have clients
> > expose host/port pairs as "bootstrap.servers". Should this be
> > "application.server"?
> >
> > 2. I don't think we should expose another abstraction called
> > KafkaStreamsInstance to the user. This is related to the discussion of
> the
> > right abstraction that we want to expose to an application. The
> abstraction
> > discussion itself should probably be part of the KIP itself, let me give
> a
> > quick summary of my thoughts here:
> > 1. The person implementing an application using Queryable State has
> likely
> > already made some choices for the service layer–a REST framework, Thrift,
> > or whatever. We don't really want to add another RPC framework to this
> mix,
> > nor do we want to try to make Kafka's RPC mechanism general purpose.
> > 2. Likewise, it should be clear that the API you want to expose to the
> > front-end/client service is not necessarily the API you'd need internally
> > as there may be additional filtering/processing in the router.
> >
> > Given these constraints, what we prefer to add is a fairly low-level
> > "toolbox" that would let you do anything you want, but requires to route
> > and perform any aggregation or processing yourself. This pattern is
> > not recommended for all kinds of services/apps, but there are definitely
> a
> > category of things where it is a big win and other advanced applications
> > are out-of-scope.
> >
> > The APIs we expose should take the following things into consideration:
> > 1. Make it clear to the user that they will do the routing, aggregation,
> > processing themselves. So the bare minimum that we want to expose is
> store
> > and partition metadata per application server identified by the host and
> > port.
> > 2. Ensure that the API exposes abstractions that are known to the user or
> > are intuitive to the user.
> > 3. Avoid exposing internal objects or implementation details to the user.
> >
> > So tying all this into answering the question of what we should expose
> > through the APIs -
> >
> > In Kafka Streams, the user is aware of the concept of tasks and
> partitions
> > since the application scales with the number of partitions and tasks are
> > 

[jira] [Commented] (KAFKA-3950) kafka mirror maker tool is not respecting whitelist option

2016-07-12 Thread TAO XIAO (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372705#comment-15372705
 ] 

TAO XIAO commented on KAFKA-3950:
-

How about we still keep the filtering on client side and fix the broken piece?

Here is the trouble maker 
[ConsumerCoordinator.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L150]
 that fails the validation. Can we remove below check? 

{code}
if (!cluster.unauthorizedTopics().isEmpty())
throw new TopicAuthorizationException(new 
HashSet<>(cluster.unauthorizedTopics()));
{code}

As the authorization check has been done on server-side already when fetching 
metadata all topics stored in {code}cluster.topics(){code} should the ones the 
consumer has permission to read. We can simply return them that matches the 
pattern to end user

> kafka mirror maker tool is not respecting whitelist option
> --
>
> Key: KAFKA-3950
> URL: https://issues.apache.org/jira/browse/KAFKA-3950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raghav Kumar Gautam
>Assignee: Manikumar Reddy
>Priority: Critical
>
> A mirror maker launched like this:
> {code}
> /usr/bin/kinit -k -t /home/kfktest/hadoopqa/keytabs/kfktest.headless.keytab 
> kfkt...@example.com
> JAVA_HOME=/usr/jdk64/jdk1.8.0_77 JMX_PORT=9112 
> /usr/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_consumer_12.properties
>  --producer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_producer_12.properties
>  --new.consumer --whitelist="test.*" >>  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/mirror_maker_12.log
>  2>&1 & echo pid:$! >  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/entity_12_pid
> {code}
> Lead to TopicAuthorizationException:
> {code}
> WARN Error while fetching metadata with correlation id 44 : 
> {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED} 
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-20 13:24:49,983] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [__consumer_offsets]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3950) kafka mirror maker tool is not respecting whitelist option

2016-07-12 Thread TAO XIAO (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372705#comment-15372705
 ] 

TAO XIAO edited comment on KAFKA-3950 at 7/12/16 11:18 AM:
---

How about we still keep the filtering on client side and fix the broken piece?

Here is the trouble maker 
[ConsumerCoordinator.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L150]
 that fails the validation. Can we remove below check? 

{code}
if (!cluster.unauthorizedTopics().isEmpty())
throw new TopicAuthorizationException(new 
HashSet<>(cluster.unauthorizedTopics()));
{code}

As the authorization check has been done on server-side already when fetching 
metadata all topics stored in {code}cluster.topics(){code} should be the ones 
the consumer has permission to read. We can simply return them that matches the 
pattern to end user


was (Author: xiaotao183):
How about we still keep the filtering on client side and fix the broken piece?

Here is the trouble maker 
[ConsumerCoordinator.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L150]
 that fails the validation. Can we remove below check? 

{code}
if (!cluster.unauthorizedTopics().isEmpty())
throw new TopicAuthorizationException(new 
HashSet<>(cluster.unauthorizedTopics()));
{code}

As the authorization check has been done on server-side already when fetching 
metadata all topics stored in {code}cluster.topics(){code} should the ones the 
consumer has permission to read. We can simply return them that matches the 
pattern to end user

> kafka mirror maker tool is not respecting whitelist option
> --
>
> Key: KAFKA-3950
> URL: https://issues.apache.org/jira/browse/KAFKA-3950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raghav Kumar Gautam
>Assignee: Manikumar Reddy
>Priority: Critical
>
> A mirror maker launched like this:
> {code}
> /usr/bin/kinit -k -t /home/kfktest/hadoopqa/keytabs/kfktest.headless.keytab 
> kfkt...@example.com
> JAVA_HOME=/usr/jdk64/jdk1.8.0_77 JMX_PORT=9112 
> /usr/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_consumer_12.properties
>  --producer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_producer_12.properties
>  --new.consumer --whitelist="test.*" >>  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/mirror_maker_12.log
>  2>&1 & echo pid:$! >  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/entity_12_pid
> {code}
> Lead to TopicAuthorizationException:
> {code}
> WARN Error while fetching metadata with correlation id 44 : 
> {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED} 
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-20 13:24:49,983] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [__consumer_offsets]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

2016-07-12 Thread Simon Souter
 Hi,

An issue I have regarding rebalancing, is that a call to poll() triggers
the JoinGroupRequest when rebalancing is in process.  In cases where a
consumer is streaming more than a single batch at a time, there is no
opportunity to attempt to flush any consumed batches through prior to the
rebalance completing.  If onPartitionsRevoked would be called via a
background thread, or an alive() call, there would be an opportunity for a
client to hold off from calling poll, until downstream messages are flushed
prior to calling poll again to trigger the Join and onPartitionsAssigned.

The current assumption appears to be that a call to poll() indicates that
there are no more in-flight messages.  Attempting to decouple consumer and
processor threads or the streaming of multiple batches results in
unavoidable redeliveries during a rebalance.

Regards

Simon Souter

https://github.com/cakesolutions/scala-kafka-client


-- 

*Simon Souter*

Software Engineer - Team Lead
Cake Solutions Limited


Find out more about The Art of Possible 

Overview videos  - Check
out our wide range of services

Cake’s blog  - Read all about the
exciting technical problems we are solving

Twitter  - Keep up-to-date with white
papers, events, user group updates and other snippets of wisdom

T: 0845 6171200

*T:* (from outside UK): +44 (0)161 443 2355


*sim...@cakesolutions.net *

www.cakesolutions.net

Company registered in UK, No. 4184567

If you have received this e-mail in error please accept our apologies,
destroy it immediately and it would be greatly appreciated if you notified
the sender. It is your responsibility to protect your system from viruses
and any other harmful code or device. We try to eliminate them from e-mails
and attachments; but we accept no liability for any which remain. We may
monitor or access any or all e-mails sent to us.


[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-12 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372565#comment-15372565
 ] 

Andy Coates commented on KAFKA-3919:


[~junrao]  Good stuff. Look forward to hearing from you and getting involved 
more =)

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the series of events leading up to this. Here’s what we know 
> happened, with regards to one partition that has issues, from the logs:
> Prior to outage:
> * Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being 
> the preferred leader.
> * Producers using acks=1, compression=gzip
> * Brokers configured with 

[jira] [Commented] (KAFKA-3954) Consumer should use internal topics information returned by the broker

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372557#comment-15372557
 ] 

ASF GitHub Bot commented on KAFKA-3954:
---

GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/1613

KAFKA-3954; Consumer should use internal topics information returned by the 
broker

It previously hardcoded it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
kafka-3954-consumer-internal-topics-from-broker

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1613.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1613


commit f37a8d26eb78375a28583795adfafcd42f2e5832
Author: Ismael Juma 
Date:   2016-07-12T08:46:31Z

KAFKA-3954; Consumer should use internal topics information returned by the 
broker

It previously hardcoded it.




> Consumer should use internal topics information returned by the broker
> --
>
> Key: KAFKA-3954
> URL: https://issues.apache.org/jira/browse/KAFKA-3954
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.10.1.0
>
>
> It currently hardcodes it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1613: KAFKA-3954; Consumer should use internal topics in...

2016-07-12 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/1613

KAFKA-3954; Consumer should use internal topics information returned by the 
broker

It previously hardcoded it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
kafka-3954-consumer-internal-topics-from-broker

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1613.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1613


commit f37a8d26eb78375a28583795adfafcd42f2e5832
Author: Ismael Juma 
Date:   2016-07-12T08:46:31Z

KAFKA-3954; Consumer should use internal topics information returned by the 
broker

It previously hardcoded it.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-3954) Consumer should use internal topics information returned by the broker

2016-07-12 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-3954:
--

 Summary: Consumer should use internal topics information returned 
by the broker
 Key: KAFKA-3954
 URL: https://issues.apache.org/jira/browse/KAFKA-3954
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 0.10.1.0


It currently hardcodes it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3953) start kafka fail

2016-07-12 Thread ffh (JIRA)
ffh created KAFKA-3953:
--

 Summary: start kafka fail
 Key: KAFKA-3953
 URL: https://issues.apache.org/jira/browse/KAFKA-3953
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.2
 Environment: Linux host-172-28-0-3 3.10.0-327.18.2.el7.x86_64 #1 SMP 
Thu May 12 11:03:55 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
Reporter: ffh


kafka start fail. error messege:
[2016-07-12 03:57:32,717] FATAL [Kafka Server 0], Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at kafka.controller.KafkaController.clientId(KafkaController.scala:215)
at 
kafka.controller.ControllerBrokerRequestBatch.(ControllerChannelManager.scala:189)
at 
kafka.controller.PartitionStateMachine.(PartitionStateMachine.scala:48)
at kafka.controller.KafkaController.(KafkaController.scala:156)
at kafka.server.KafkaServer.startup(KafkaServer.scala:148)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:29)
at kafka.Kafka$.main(Kafka.scala:72)
at kafka.Kafka.main(Kafka.scala)
[2016-07-12 03:57:33,124] FATAL Fatal error during KafkaServerStartable 
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at kafka.controller.KafkaController.clientId(KafkaController.scala:215)
at 
kafka.controller.ControllerBrokerRequestBatch.(ControllerChannelManager.scala:189)
at 
kafka.controller.PartitionStateMachine.(PartitionStateMachine.scala:48)
at kafka.controller.KafkaController.(KafkaController.scala:156)
at kafka.server.KafkaServer.startup(KafkaServer.scala:148)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:29)
at kafka.Kafka$.main(Kafka.scala:72)
at kafka.Kafka.main(Kafka.scala)

config:

# Generated by Apache Ambari. Tue Jul 12 03:18:02 2016

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
broker.id=0
compression.type=producer
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
controller.message.queue.size=10
controller.socket.timeout.ms=3
default.replication.factor=1
delete.topic.enable=false
external.kafka.metrics.exclude.prefix=kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory,kafka.server.BrokerTopicMetrics.BytesRejectedPerSec
external.kafka.metrics.include.prefix=kafka.network.RequestMetrics.ResponseQueueTimeMs.request.OffsetCommit.98percentile,kafka.network.RequestMetrics.ResponseQueueTimeMs.request.Offsets.95percentile,kafka.network.RequestMetrics.ResponseSendTimeMs.request.Fetch.95percentile,kafka.network.RequestMetrics.RequestsPerSec.request
fetch.purgatory.purge.interval.requests=1
kafka.ganglia.metrics.group=kafka
kafka.ganglia.metrics.host=localhost
kafka.ganglia.metrics.port=8671
kafka.ganglia.metrics.reporter.enabled=true
kafka.metrics.reporters=
kafka.timeline.metrics.host=
kafka.timeline.metrics.maxRowCacheSize=1
kafka.timeline.metrics.port=
kafka.timeline.metrics.reporter.enabled=true
kafka.timeline.metrics.reporter.sendInterval=5900
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
listeners=PLAINTEXT://host-172-28-0-3:6667
log.cleanup.interval.mins=10
log.dirs=/kafka-logs
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.bytes=-1
log.retention.hours=168
log.roll.hours=168
log.segment.bytes=1073741824
message.max.bytes=100
min.insync.replicas=1
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offset.metadata.max.bytes=4096
offsets.commit.required.acks=-1
offsets.commit.timeout.ms=5000
offsets.load.buffer.size=5242880
offsets.retention.check.interval.ms=60
offsets.retention.minutes=8640
offsets.topic.compression.codec=0
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.topic.segment.bytes=104857600
principal.to.local.class=kafka.security.auth.KerberosPrincipalToLocal
producer.purgatory.purge.interval.requests=1
queued.max.requests=500
replica.fetch.max.bytes=1048576
replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.max.messages=4000
replica.lag.time.max.ms=1
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=3
security.inter.broker.protocol=PLAINTEXTSASL
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
super.users=user:kafka

[jira] [Commented] (KAFKA-3950) kafka mirror maker tool is not respecting whitelist option

2016-07-12 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372516#comment-15372516
 ] 

Ismael Juma commented on KAFKA-3950:


[~omkreddy], yes, it's a current limitation that if you use pattern 
subscriptions, you need to have describe permission for all topics. There are 
many advantages in doing the filtering server-side, but that's a bit tricky 
because the protocol-level regex would have to work across languages. One 
option is to use a perl regex library in Java as that is probably the closest 
one would have to a standard. In any case, it's a significant change which 
would require a KIP and quite a bit of discussion.

> kafka mirror maker tool is not respecting whitelist option
> --
>
> Key: KAFKA-3950
> URL: https://issues.apache.org/jira/browse/KAFKA-3950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raghav Kumar Gautam
>Assignee: Manikumar Reddy
>Priority: Critical
>
> A mirror maker launched like this:
> {code}
> /usr/bin/kinit -k -t /home/kfktest/hadoopqa/keytabs/kfktest.headless.keytab 
> kfkt...@example.com
> JAVA_HOME=/usr/jdk64/jdk1.8.0_77 JMX_PORT=9112 
> /usr/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_consumer_12.properties
>  --producer.config 
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/config/mirror_producer_12.properties
>  --new.consumer --whitelist="test.*" >>  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/mirror_maker_12.log
>  2>&1 & echo pid:$! >  
> /usr/kafka/system_test/mirror_maker_testsuite/testcase_15001/logs/mirror_maker-12/entity_12_pid
> {code}
> Lead to TopicAuthorizationException:
> {code}
> WARN Error while fetching metadata with correlation id 44 : 
> {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED} 
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-20 13:24:49,983] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [__consumer_offsets]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-66 Kafka Connect Transformers for messages

2016-07-12 Thread Michael Noll
As Gwen said, my initial thought is that message transformations that are
"more than trivial" should rather be done by Kafka Streams, rather than by
Kafka Connect (for the reasons that Gwen mentioned).

Transforming one message at a time would be a good fit for Kafka Connect.
An important use case is to remove sensitive data (such as PII) from an
incoming data stream before it hits Kafka's persistent storage -- this use
case can't be implemented well with Kafka Streams because, by design, Kafka
Streams is meant to read its input data from Kafka (i.e. at the point when
Kafka Streams could be used to removed sensitive data fields the data is
already stored persistently in Kafka, and this might be a no-go depending
on the use case).

I'm of course interested to hear what other people think.


On Tue, Jul 12, 2016 at 6:06 AM, Gwen Shapira  wrote:

> I think we need to restrict the functionality to one-message-at-a-time.
>
> Basically, connect gives very little guarantees about the size of the set
> of the composition (you may get same messages over and over, mix of old and
> new, etc)
>
> In order to do useful things over a collection, you need better defined
> semantics of what's included. Kafka Streams is putting tons of effort into
> having good windowing semantics, and I think apps that require modification
> of collections are a better fit there.
>
> I'm willing to change my mind though (have been known to happen) - what are
> the comments about usage that point toward the collections approach?
>
> Gwen
>
> On Mon, Jul 11, 2016 at 3:32 PM, Nisarg Shah  wrote:
>
> > Thanks Jay, added that to the KIP.
> >
> > Besides reviewing the KIP as a whole, I wanted to know about what
> everyone
> > thinks about what data should be dealt at the Transformer level.
> Transform
> > the whole Collection of Records (giving the flexibility of modifying
> > messages across the set) OR
> > Transform messages one at a time, iteratively. This will restrict
> > modifications across messages.
> >
> > I’ll get a working sample ready soon, to have a look. There were some
> > comments about Transformer usage that pointed to the first approach,
> which
> > I prefer too given the flexibility.
> >
> > > On Jul 11, 2016, at 2:49 PM, Jay Kreps  wrote:
> > >
> > > One minor thing, the Transformer interface probably needs a close()
> > method
> > > (i.e. the opposite of initialize). This would be used for any
> transformer
> > > that uses a resource like a file/socket/db connection/etc that needs to
> > be
> > > closed. You usually don't need this but when you do need it you really
> > need
> > > it.
> > >
> > > -Jay
> > >
> > > On Mon, Jul 11, 2016 at 1:47 PM, Nisarg Shah 
> wrote:
> > >
> > >> Hello,
> > >>
> > >> This KIP <
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-66:+Add+Kafka+Connect+Transformers+to+allow+transformations+to+messages
> > >
> > >> is for KAFKA-3209 .
> > >> It’s about capabilities to transform messages in Kafka Connect.
> > >>
> > >> Some design decisions need to be taken, so please advise me on the
> same.
> > >> Feel free to express any thoughts or concerns as well.
> > >>
> > >> Many many thanks to Ewen Cheslack-Postava.
> > >>
> > >> -Nisarg
> >
> >
>



-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*