[GitHub] kafka pull request #1622: KAFKA-3859: Fix describe group command to report v...

2016-07-13 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1622

KAFKA-3859: Fix describe group command to report valid status when group is 
empty

With the new consumer, when all consumers of a consumer group are stopped 
(i.e. the group is empty), the describe consumer group command returns 
`Consumer group ... is rebalancing.`
This PR fixes this issue by distinguishing between when the consumer group 
is actually rebalancing and when it has no active members.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3859

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1622.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1622


commit e3555de77d8235d0230b1a625b7b3f4e494632d4
Author: Vahid Hashemian 
Date:   2016-07-13T23:09:02Z

KAFKA-3859: Fix describe group command to report valid status when group is 
empty

With the new consumer, when all consumers of a consumer group are stopped 
(i.e. the group is empty), the describe consumer group command returns 
`Consumer group ... is rebalancing.`
This PR fixes this issue by distinguishing between when the consumer group 
is actually rebalancing and when it has no active members.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3859) Consumer group is stuck in rebalancing status

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376317#comment-15376317
 ] 

ASF GitHub Bot commented on KAFKA-3859:
---

GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1622

KAFKA-3859: Fix describe group command to report valid status when group is 
empty

With the new consumer, when all consumers of a consumer group are stopped 
(i.e. the group is empty), the describe consumer group command returns 
`Consumer group ... is rebalancing.`
This PR fixes this issue by distinguishing between when the consumer group 
is actually rebalancing and when it has no active members.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3859

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1622.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1622


commit e3555de77d8235d0230b1a625b7b3f4e494632d4
Author: Vahid Hashemian 
Date:   2016-07-13T23:09:02Z

KAFKA-3859: Fix describe group command to report valid status when group is 
empty

With the new consumer, when all consumers of a consumer group are stopped 
(i.e. the group is empty), the describe consumer group command returns 
`Consumer group ... is rebalancing.`
This PR fixes this issue by distinguishing between when the consumer group 
is actually rebalancing and when it has no active members.




> Consumer group is stuck in rebalancing status
> -
>
> Key: KAFKA-3859
> URL: https://issues.apache.org/jira/browse/KAFKA-3859
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> * I have a topic (1 partition) and a producer and new consumer that produce 
> to and consumer from the topic.
> * The consumer belongs to group {{A}}.
> * I kill the consumer (whether it has consumed any messages or not does not 
> seem to be relevant).
> * After a short period when group status is processed and finalized, I run 
> the consumer-group describe command ({{kafka-consumer-groups.sh 
> --bootstrap-server localhost:9092 --new-consumer --describe --group A}}).
> * The response I receive is {{Consumer group `A` is rebalancing.}}
> * I keep trying the command but the response does not change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : kafka-0.10.0-jdk7 #148

2016-07-13 Thread Apache Jenkins Server
See 



[jira] [Resolved] (KAFKA-3941) Avoid applying eviction listener in InMemoryKeyValueLoggedStore

2016-07-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3941.
--
   Resolution: Fixed
Fix Version/s: 0.10.0.1

Issue resolved by pull request 1610
[https://github.com/apache/kafka/pull/1610]

> Avoid applying eviction listener in InMemoryKeyValueLoggedStore
> ---
>
> Key: KAFKA-3941
> URL: https://issues.apache.org/jira/browse/KAFKA-3941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.10.0.1
>
>
> This is reported by [~norwood].
> In {{InMemoryKeyValueLoggedStore}} we set the eviction listener while 
> creating the store, which records the evicted records as "deleted" in the 
> changelogger, which will then send a tombstone record to the corresponding 
> changelog topic partition. However, when restoring the store, although we are 
> using the inner store's putInternal call and hence by-pass the logging since 
> it is not needed, this eviction listener will still call the outer store's 
> deleted call and hence still sends the tombstone record, causing the 
> restoration process to fail, as it is not expecting the changelog 
> log-end-offset to increase (i.e. more messages are appended to it) while 
> restoration is going on.
> We should defer the listener initialization until the end of the {{init}} 
> call after the restoration is completed, and also making sure the "register" 
> call is made at the inner stores only.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3941) Avoid applying eviction listener in InMemoryKeyValueLoggedStore

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376123#comment-15376123
 ] 

ASF GitHub Bot commented on KAFKA-3941:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1610


> Avoid applying eviction listener in InMemoryKeyValueLoggedStore
> ---
>
> Key: KAFKA-3941
> URL: https://issues.apache.org/jira/browse/KAFKA-3941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> This is reported by [~norwood].
> In {{InMemoryKeyValueLoggedStore}} we set the eviction listener while 
> creating the store, which records the evicted records as "deleted" in the 
> changelogger, which will then send a tombstone record to the corresponding 
> changelog topic partition. However, when restoring the store, although we are 
> using the inner store's putInternal call and hence by-pass the logging since 
> it is not needed, this eviction listener will still call the outer store's 
> deleted call and hence still sends the tombstone record, causing the 
> restoration process to fail, as it is not expecting the changelog 
> log-end-offset to increase (i.e. more messages are appended to it) while 
> restoration is going on.
> We should defer the listener initialization until the end of the {{init}} 
> call after the restoration is completed, and also making sure the "register" 
> call is made at the inner stores only.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1610: KAFKA-3941: Delay eviction listener in InMemoryKey...

2016-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1610


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Kafka Streams question

2016-07-13 Thread Poul Costinsky
Hi! I am prototyping some code using Kafka Streams, and have a question. I need 
to map a stream into another (with different partition key) and join it with a 
table. How do I control number of partitions of the mapped stream?

Thanks! 

Poul Costinsky
Chief Architect

 
(360) 207-1753 






[jira] [Commented] (KAFKA-3857) Additional log cleaner metrics

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376102#comment-15376102
 ] 

ASF GitHub Bot commented on KAFKA-3857:
---

Github user kiranptivo closed the pull request at:

https://github.com/apache/kafka/pull/1593


> Additional log cleaner metrics
> --
>
> Key: KAFKA-3857
> URL: https://issues.apache.org/jira/browse/KAFKA-3857
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kiran Pillarisetty
>
> The proposal would be to add a couple of additional log cleaner metrics: 
> 1. Time of last log cleaner run 
> 2. Cumulative number of successful log cleaner runs since last broker restart.
> Existing log cleaner metrics (max-buffer-utilization-percent, 
> cleaner-recopy-percent, max-clean-time-secs, max-dirty-percent) do not 
> differentiate an idle log cleaner from a dead log cleaner. It would be useful 
> to have the above two metrics added, to indicate whether log cleaner is alive 
> (and successfully cleaning) or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3857) Additional log cleaner metrics

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376103#comment-15376103
 ] 

ASF GitHub Bot commented on KAFKA-3857:
---

GitHub user kiranptivo reopened a pull request:

https://github.com/apache/kafka/pull/1593

KAFKA-3857 Additional log cleaner metrics

Fixes KAFKA-3857

Changes proposed in this pull request:

The following additional log cleaner metrics have been added.
1. num-runs: Cumulative number of successful log cleaner runs since last 
broker restart.
2. last-run-time: Time of last log cleaner run.
3. num-filthy-logs: Number of filthy logs. A non zero value for an extended 
period of time indicates that the cleaner has not been successful in cleaning 
the logs.

A note on num-filthy-logs: It is incremented whenever a filthy topic 
partition is added to inProgress HashMap. And it is decremented once the 
cleaning is successful, or if the cleaning is aborted. Note that the existing 
LogCleaner code does not provide a metric to check if the clean operation is 
successful or not. There is an inProgress HashMap with topicPartition  => 
LogCleaningInProgress entries in it, but the entries are removed from the 
HashMap even when clean operation throws an exception. So, added an additional 
metric num-filthy-logs, to differentiate between a successful log clean case 
and an exception case.

The code is ready. I have tested and verified JMX metrics. There is one 
case I couldn't test though. It's the case where numFilthyLogs is decremented 
in 'resumeCleaning(...)' in LogCleanerManager.scala Line 188. It seems to be a 
part of the workflow that aborts the cleaning of a particular partition. Any 
ideas on how to test this scenario?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/TiVo/kafka log_cleaner_jmx_metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1593


commit f00de412f6b1f6568adef479687ae0df789f9c96
Author: Kiran Pillarisetty 
Date:   2016-06-14T17:40:26Z

Create a couple of additional Log Cleaner JMX metrics
log-clean-last-run: Log cleaner's last run time
log-clean-runs: Number of log cleaner runs.

commit 7dc7511ee2b6d3cdf9df0c366fe23bf34d062a54
Author: Kiran Pillarisetty 
Date:   2016-06-14T20:24:00Z

Created a couple of additional Log Cleaner JMX metrics
log-clean-last-run: a metric to track last log cleaner run (unix timestamp)
log-clean-runs: a metric to track number of log cleaner runs

Committer: Kiran Pillarisetty 

commit 7f1214ff1118103dd639df717e988a22bad8033d
Author: Kiran Pillarisetty 
Date:   2016-07-01T22:14:57Z

Add additional JMX metric to track successful cleaning of a log segment

commit 1ac346bb37008312e41035167dbfd75803595cd6
Author: Kiran Pillarisetty 
Date:   2016-07-01T22:17:25Z

Add additional JMX metric to track successful cleaning of a log segment

commit 4f08d875e05c35bd7d7c849584b8b029031f884b
Author: Kiran Pillarisetty 
Date:   2016-07-05T22:23:20Z

Metric name updated to num-filthy-logs. Metric incremented as it is grabbed 
for cleaning, and decremented once the cleaning is done, or if the cleaning is 
aborted

commit cd887c05bf1d56b7566c5b72b3ddf3bcdfb70898
Author: Kiran Pillarisetty 
Date:   2016-07-05T23:31:32Z

Changed a metric name (number-of-runs to num-runs). Removed an extra \n 
around line 164. It is not present in the trunk




> Additional log cleaner metrics
> --
>
> Key: KAFKA-3857
> URL: https://issues.apache.org/jira/browse/KAFKA-3857
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kiran Pillarisetty
>
> The proposal would be to add a couple of additional log cleaner metrics: 
> 1. Time of last log cleaner run 
> 2. Cumulative number of successful log cleaner runs since last broker restart.
> Existing log cleaner metrics (max-buffer-utilization-percent, 
> cleaner-recopy-percent, max-clean-time-secs, max-dirty-percent) do not 
> differentiate an idle log cleaner from a dead log cleaner. It would be useful 
> to have the above two metrics added, to indicate whether log cleaner is alive 
> (and successfully cleaning) or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1593: KAFKA-3857 Additional log cleaner metrics

2016-07-13 Thread kiranptivo
GitHub user kiranptivo reopened a pull request:

https://github.com/apache/kafka/pull/1593

KAFKA-3857 Additional log cleaner metrics

Fixes KAFKA-3857

Changes proposed in this pull request:

The following additional log cleaner metrics have been added.
1. num-runs: Cumulative number of successful log cleaner runs since last 
broker restart.
2. last-run-time: Time of last log cleaner run.
3. num-filthy-logs: Number of filthy logs. A non zero value for an extended 
period of time indicates that the cleaner has not been successful in cleaning 
the logs.

A note on num-filthy-logs: It is incremented whenever a filthy topic 
partition is added to inProgress HashMap. And it is decremented once the 
cleaning is successful, or if the cleaning is aborted. Note that the existing 
LogCleaner code does not provide a metric to check if the clean operation is 
successful or not. There is an inProgress HashMap with topicPartition  => 
LogCleaningInProgress entries in it, but the entries are removed from the 
HashMap even when clean operation throws an exception. So, added an additional 
metric num-filthy-logs, to differentiate between a successful log clean case 
and an exception case.

The code is ready. I have tested and verified JMX metrics. There is one 
case I couldn't test though. It's the case where numFilthyLogs is decremented 
in 'resumeCleaning(...)' in LogCleanerManager.scala Line 188. It seems to be a 
part of the workflow that aborts the cleaning of a particular partition. Any 
ideas on how to test this scenario?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/TiVo/kafka log_cleaner_jmx_metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1593


commit f00de412f6b1f6568adef479687ae0df789f9c96
Author: Kiran Pillarisetty 
Date:   2016-06-14T17:40:26Z

Create a couple of additional Log Cleaner JMX metrics
log-clean-last-run: Log cleaner's last run time
log-clean-runs: Number of log cleaner runs.

commit 7dc7511ee2b6d3cdf9df0c366fe23bf34d062a54
Author: Kiran Pillarisetty 
Date:   2016-06-14T20:24:00Z

Created a couple of additional Log Cleaner JMX metrics
log-clean-last-run: a metric to track last log cleaner run (unix timestamp)
log-clean-runs: a metric to track number of log cleaner runs

Committer: Kiran Pillarisetty 

commit 7f1214ff1118103dd639df717e988a22bad8033d
Author: Kiran Pillarisetty 
Date:   2016-07-01T22:14:57Z

Add additional JMX metric to track successful cleaning of a log segment

commit 1ac346bb37008312e41035167dbfd75803595cd6
Author: Kiran Pillarisetty 
Date:   2016-07-01T22:17:25Z

Add additional JMX metric to track successful cleaning of a log segment

commit 4f08d875e05c35bd7d7c849584b8b029031f884b
Author: Kiran Pillarisetty 
Date:   2016-07-05T22:23:20Z

Metric name updated to num-filthy-logs. Metric incremented as it is grabbed 
for cleaning, and decremented once the cleaning is done, or if the cleaning is 
aborted

commit cd887c05bf1d56b7566c5b72b3ddf3bcdfb70898
Author: Kiran Pillarisetty 
Date:   2016-07-05T23:31:32Z

Changed a metric name (number-of-runs to num-runs). Removed an extra \n 
around line 164. It is not present in the trunk




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3857) Additional log cleaner metrics

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376100#comment-15376100
 ] 

ASF GitHub Bot commented on KAFKA-3857:
---

Github user kiranptivo closed the pull request at:

https://github.com/apache/kafka/pull/1593


> Additional log cleaner metrics
> --
>
> Key: KAFKA-3857
> URL: https://issues.apache.org/jira/browse/KAFKA-3857
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kiran Pillarisetty
>
> The proposal would be to add a couple of additional log cleaner metrics: 
> 1. Time of last log cleaner run 
> 2. Cumulative number of successful log cleaner runs since last broker restart.
> Existing log cleaner metrics (max-buffer-utilization-percent, 
> cleaner-recopy-percent, max-clean-time-secs, max-dirty-percent) do not 
> differentiate an idle log cleaner from a dead log cleaner. It would be useful 
> to have the above two metrics added, to indicate whether log cleaner is alive 
> (and successfully cleaning) or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1593: KAFKA-3857 Additional log cleaner metrics

2016-07-13 Thread kiranptivo
Github user kiranptivo closed the pull request at:

https://github.com/apache/kafka/pull/1593


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3857) Additional log cleaner metrics

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376101#comment-15376101
 ] 

ASF GitHub Bot commented on KAFKA-3857:
---

GitHub user kiranptivo reopened a pull request:

https://github.com/apache/kafka/pull/1593

KAFKA-3857 Additional log cleaner metrics

Fixes KAFKA-3857

Changes proposed in this pull request:

The following additional log cleaner metrics have been added.
1. num-runs: Cumulative number of successful log cleaner runs since last 
broker restart.
2. last-run-time: Time of last log cleaner run.
3. num-filthy-logs: Number of filthy logs. A non zero value for an extended 
period of time indicates that the cleaner has not been successful in cleaning 
the logs.

A note on num-filthy-logs: It is incremented whenever a filthy topic 
partition is added to inProgress HashMap. And it is decremented once the 
cleaning is successful, or if the cleaning is aborted. Note that the existing 
LogCleaner code does not provide a metric to check if the clean operation is 
successful or not. There is an inProgress HashMap with topicPartition  => 
LogCleaningInProgress entries in it, but the entries are removed from the 
HashMap even when clean operation throws an exception. So, added an additional 
metric num-filthy-logs, to differentiate between a successful log clean case 
and an exception case.

The code is ready. I have tested and verified JMX metrics. There is one 
case I couldn't test though. It's the case where numFilthyLogs is decremented 
in 'resumeCleaning(...)' in LogCleanerManager.scala Line 188. It seems to be a 
part of the workflow that aborts the cleaning of a particular partition. Any 
ideas on how to test this scenario?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/TiVo/kafka log_cleaner_jmx_metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1593


commit f00de412f6b1f6568adef479687ae0df789f9c96
Author: Kiran Pillarisetty 
Date:   2016-06-14T17:40:26Z

Create a couple of additional Log Cleaner JMX metrics
log-clean-last-run: Log cleaner's last run time
log-clean-runs: Number of log cleaner runs.

commit 7dc7511ee2b6d3cdf9df0c366fe23bf34d062a54
Author: Kiran Pillarisetty 
Date:   2016-06-14T20:24:00Z

Created a couple of additional Log Cleaner JMX metrics
log-clean-last-run: a metric to track last log cleaner run (unix timestamp)
log-clean-runs: a metric to track number of log cleaner runs

Committer: Kiran Pillarisetty 

commit 7f1214ff1118103dd639df717e988a22bad8033d
Author: Kiran Pillarisetty 
Date:   2016-07-01T22:14:57Z

Add additional JMX metric to track successful cleaning of a log segment

commit 1ac346bb37008312e41035167dbfd75803595cd6
Author: Kiran Pillarisetty 
Date:   2016-07-01T22:17:25Z

Add additional JMX metric to track successful cleaning of a log segment

commit 4f08d875e05c35bd7d7c849584b8b029031f884b
Author: Kiran Pillarisetty 
Date:   2016-07-05T22:23:20Z

Metric name updated to num-filthy-logs. Metric incremented as it is grabbed 
for cleaning, and decremented once the cleaning is done, or if the cleaning is 
aborted

commit cd887c05bf1d56b7566c5b72b3ddf3bcdfb70898
Author: Kiran Pillarisetty 
Date:   2016-07-05T23:31:32Z

Changed a metric name (number-of-runs to num-runs). Removed an extra \n 
around line 164. It is not present in the trunk




> Additional log cleaner metrics
> --
>
> Key: KAFKA-3857
> URL: https://issues.apache.org/jira/browse/KAFKA-3857
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kiran Pillarisetty
>
> The proposal would be to add a couple of additional log cleaner metrics: 
> 1. Time of last log cleaner run 
> 2. Cumulative number of successful log cleaner runs since last broker restart.
> Existing log cleaner metrics (max-buffer-utilization-percent, 
> cleaner-recopy-percent, max-clean-time-secs, max-dirty-percent) do not 
> differentiate an idle log cleaner from a dead log cleaner. It would be useful 
> to have the above two metrics added, to indicate whether log cleaner is alive 
> (and successfully cleaning) or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1593: KAFKA-3857 Additional log cleaner metrics

2016-07-13 Thread kiranptivo
Github user kiranptivo closed the pull request at:

https://github.com/apache/kafka/pull/1593


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-3963) Missing messages from the controller to brokers

2016-07-13 Thread Maysam Yabandeh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maysam Yabandeh resolved KAFKA-3963.

Resolution: Invalid

> Missing messages from the controller to brokers
> ---
>
> Key: KAFKA-3963
> URL: https://issues.apache.org/jira/browse/KAFKA-3963
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Maysam Yabandeh
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> The controller takes messages from a queue and send it to the designated 
> broker. If the controller times out on receiving a response from the broker 
> (30s) it closes the connection and retries again after a backoff period, 
> however it does not return the message back to the queue. As a result the 
> retry will start with the next message and the previous message might have 
> never been received by the broker.
> {code}
> val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
> ...
>   try {
> ...
>   clientResponse = 
> networkClient.blockingSendAndReceive(clientRequest)(time)
> ...
> }
>   } catch {
> case e: Throwable => // if the send was not successful, reconnect 
> to broker and resend the message
>   warn(("Controller %d epoch %d fails to send request %s to 
> broker %s. " +
> "Reconnecting to broker.").format(controllerId, 
> controllerContext.epoch,
>   request.toString, brokerNode.toString()), e)
>   networkClient.close(brokerNode.idString)
> ...
>   }
> {code}
> This could violates the semantics that developers had assumed when writing 
> controller-broker protocol. For example, the controller code sends metadata 
> updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly 
> joined broker for the first time. 
> {code}
>   def onBrokerStartup(newBrokers: Seq[Int]) {
> info("New broker startup callback for 
> %s".format(newBrokers.mkString(",")))
> val newBrokersSet = newBrokers.toSet
> // send update metadata request to all live and shutting down brokers. 
> Old brokers will get to know of the new
> // broker via this update.
> // In cases of controlled shutdown leaders will not be elected when a new 
> broker comes up. So at least in the
> // common controlled shutdown case, the metadata will reach the new 
> brokers faster
> 
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
> // the very first thing to do when a new broker comes up is send it the 
> entire list of partitions that it is
> // supposed to host. Based on that the broker starts the high watermark 
> threads for the input list of partitions
> val allReplicasOnNewBrokers = 
> controllerContext.replicasOnBrokers(newBrokersSet)
> replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
> OnlineReplica)
> {code}
> This is important because without the metadata cached in the broker the 
> LeaderAndIsrRequests that ask the broker to become a follower would fail 
> since there is no metadata for leader of the partition.
> {code}
> metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
>   // Only change partition state when the leader is available
>   case Some(leaderBroker) =>
> ...
>   case None =>
> // The leader broker should always be present in the metadata 
> cache.
> // If not, we should record the error message and abort the 
> transition process for this partition
> stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest 
> with correlation id %d from controller" +
>   " %d epoch %d for partition [%s,%d] but cannot become follower 
> since the new leader %d is unavailable.")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3963) Missing messages from the controller to brokers

2016-07-13 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376065#comment-15376065
 ] 

Maysam Yabandeh commented on KAFKA-3963:


Thanks [~ijuma]. Yeah when reading the code I missed the while loop and assumed 
the the retry is done by the next invocation of doWork. The problem that we 
observed are most likely caused by KAFKA-3964 instead. I saw many "fails to 
send request" in the logs and that initially mislead me to blaming some kind of 
missing messages as the root cause.

> Missing messages from the controller to brokers
> ---
>
> Key: KAFKA-3963
> URL: https://issues.apache.org/jira/browse/KAFKA-3963
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Maysam Yabandeh
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> The controller takes messages from a queue and send it to the designated 
> broker. If the controller times out on receiving a response from the broker 
> (30s) it closes the connection and retries again after a backoff period, 
> however it does not return the message back to the queue. As a result the 
> retry will start with the next message and the previous message might have 
> never been received by the broker.
> {code}
> val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
> ...
>   try {
> ...
>   clientResponse = 
> networkClient.blockingSendAndReceive(clientRequest)(time)
> ...
> }
>   } catch {
> case e: Throwable => // if the send was not successful, reconnect 
> to broker and resend the message
>   warn(("Controller %d epoch %d fails to send request %s to 
> broker %s. " +
> "Reconnecting to broker.").format(controllerId, 
> controllerContext.epoch,
>   request.toString, brokerNode.toString()), e)
>   networkClient.close(brokerNode.idString)
> ...
>   }
> {code}
> This could violates the semantics that developers had assumed when writing 
> controller-broker protocol. For example, the controller code sends metadata 
> updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly 
> joined broker for the first time. 
> {code}
>   def onBrokerStartup(newBrokers: Seq[Int]) {
> info("New broker startup callback for 
> %s".format(newBrokers.mkString(",")))
> val newBrokersSet = newBrokers.toSet
> // send update metadata request to all live and shutting down brokers. 
> Old brokers will get to know of the new
> // broker via this update.
> // In cases of controlled shutdown leaders will not be elected when a new 
> broker comes up. So at least in the
> // common controlled shutdown case, the metadata will reach the new 
> brokers faster
> 
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
> // the very first thing to do when a new broker comes up is send it the 
> entire list of partitions that it is
> // supposed to host. Based on that the broker starts the high watermark 
> threads for the input list of partitions
> val allReplicasOnNewBrokers = 
> controllerContext.replicasOnBrokers(newBrokersSet)
> replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
> OnlineReplica)
> {code}
> This is important because without the metadata cached in the broker the 
> LeaderAndIsrRequests that ask the broker to become a follower would fail 
> since there is no metadata for leader of the partition.
> {code}
> metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
>   // Only change partition state when the leader is available
>   case Some(leaderBroker) =>
> ...
>   case None =>
> // The leader broker should always be present in the metadata 
> cache.
> // If not, we should record the error message and abort the 
> transition process for this partition
> stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest 
> with correlation id %d from controller" +
>   " %d epoch %d for partition [%s,%d] but cannot become follower 
> since the new leader %d is unavailable.")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3964) Metadata update requests are sometimes received after LeaderAndIsrRequests

2016-07-13 Thread Maysam Yabandeh (JIRA)
Maysam Yabandeh created KAFKA-3964:
--

 Summary: Metadata update requests are sometimes received after 
LeaderAndIsrRequests
 Key: KAFKA-3964
 URL: https://issues.apache.org/jira/browse/KAFKA-3964
 Project: Kafka
  Issue Type: Bug
Reporter: Maysam Yabandeh
Priority: Minor


The broker needs metadata of the leader before being able to process 
LeaderAndIsrRequest from the controller. For this reason on broker startup the 
controller first sends the metadata update requests and AFTER that it sends the 
LeaderAndIsrRequests:
{code}
 def onBrokerStartup(newBrokers: Seq[Int]) {
info("New broker startup callback for %s".format(newBrokers.mkString(",")))
val newBrokersSet = newBrokers.toSet
// send update metadata request to all live and shutting down brokers. Old 
brokers will get to know of the new
// broker via this update.
// In cases of controlled shutdown leaders will not be elected when a new 
broker comes up. So at least in the
// common controlled shutdown case, the metadata will reach the new brokers 
faster

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
// the very first thing to do when a new broker comes up is send it the 
entire list of partitions that it is
// supposed to host. Based on that the broker starts the high watermark 
threads for the input list of partitions
val allReplicasOnNewBrokers = 
controllerContext.replicasOnBrokers(newBrokersSet)
replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
OnlineReplica)
{code}

However this protocol is not followed when a nodes becomes the controller: it 
sends LeaderAndIsrRequests BEFORE sending the metadata update requests:
{code}
  def onControllerFailover() {
...
  replicaStateMachine.startup()
...
  /* send partition leadership info to all live brokers */  
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
{code}
ReplicaStateMachine::startup
{code}
  def startup() {
...
// move all Online replicas to Online
handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica){code}
which trigger LeaderAndIsrRequest messages.

Here is the symptoms that one would observe when this problem manifests:
# The first set of messages that the broker receives from the controller is 
LeaderAndIsrRequests
# The broker fails to become the follower as requested by the controller
{code}
2016-07-12 21:03:53,081 ERROR change.logger: Broker 14 received 
LeaderAndIsrRequest with correlation id 0 from controller 21 epoch 290 for 
partition [topicxyz,7] but cannot become follower since the new leader 22 is 
unavailable.
{code}
# The fetcher hence does not start and the partition remains under-replicated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3963) Missing messages from the controller to brokers

2016-07-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376029#comment-15376029
 ] 

Ismael Juma commented on KAFKA-3963:


Thanks for the report. We don't take another item from the queue until the send 
succeeds though (see `isSendSuccessful` in the while loop condition):

{code}
var isSendSuccessful = false
while (isRunning.get() && !isSendSuccessful) {
  // if a broker goes down for a long time, then at some point the 
controller's zookeeper listener will trigger a
  // removeBroker which will invoke shutdown() on this thread. At that 
point, we will stop retrying.
  try {
if (!brokerReady()) {
  isSendSuccessful = false
  backoff()
}
else {
  val requestHeader = 
apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey,
 _))
  val send = new RequestSend(brokerNode.idString, requestHeader, 
request.toStruct)
  val clientRequest = new ClientRequest(time.milliseconds(), true, 
send, null)
  clientResponse = 
networkClient.blockingSendAndReceive(clientRequest)(time)
  isSendSuccessful = true
}
  } catch {
case e: Throwable => // if the send was not successful, reconnect 
to broker and resend the message
  warn(("Controller %d epoch %d fails to send request %s to broker 
%s. " +
"Reconnecting to broker.").format(controllerId, 
controllerContext.epoch,
  request.toString, brokerNode.toString()), e)
  networkClient.close(brokerNode.idString)
  isSendSuccessful = false
  backoff()
  }
}
{code}

Did you consider this? Or am I missing something?

> Missing messages from the controller to brokers
> ---
>
> Key: KAFKA-3963
> URL: https://issues.apache.org/jira/browse/KAFKA-3963
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Maysam Yabandeh
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> The controller takes messages from a queue and send it to the designated 
> broker. If the controller times out on receiving a response from the broker 
> (30s) it closes the connection and retries again after a backoff period, 
> however it does not return the message back to the queue. As a result the 
> retry will start with the next message and the previous message might have 
> never been received by the broker.
> {code}
> val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
> ...
>   try {
> ...
>   clientResponse = 
> networkClient.blockingSendAndReceive(clientRequest)(time)
> ...
> }
>   } catch {
> case e: Throwable => // if the send was not successful, reconnect 
> to broker and resend the message
>   warn(("Controller %d epoch %d fails to send request %s to 
> broker %s. " +
> "Reconnecting to broker.").format(controllerId, 
> controllerContext.epoch,
>   request.toString, brokerNode.toString()), e)
>   networkClient.close(brokerNode.idString)
> ...
>   }
> {code}
> This could violates the semantics that developers had assumed when writing 
> controller-broker protocol. For example, the controller code sends metadata 
> updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly 
> joined broker for the first time. 
> {code}
>   def onBrokerStartup(newBrokers: Seq[Int]) {
> info("New broker startup callback for 
> %s".format(newBrokers.mkString(",")))
> val newBrokersSet = newBrokers.toSet
> // send update metadata request to all live and shutting down brokers. 
> Old brokers will get to know of the new
> // broker via this update.
> // In cases of controlled shutdown leaders will not be elected when a new 
> broker comes up. So at least in the
> // common controlled shutdown case, the metadata will reach the new 
> brokers faster
> 
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
> // the very first thing to do when a new broker comes up is send it the 
> entire list of partitions that it is
> // supposed to host. Based on that the broker starts the high watermark 
> threads for the input list of partitions
> val allReplicasOnNewBrokers = 
> controllerContext.replicasOnBrokers(newBrokersSet)
> replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
> OnlineReplica)
> {code}
> This is important because without the metadata cached in the broker the 
> LeaderAndIsrRequests that ask the broker to become a follower would fail 
> since there is no metadata for leader of the partition.
> {code}
> 

[jira] [Updated] (KAFKA-3963) Missing messages from the controller to brokers

2016-07-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3963:
---
Fix Version/s: 0.10.1.0

> Missing messages from the controller to brokers
> ---
>
> Key: KAFKA-3963
> URL: https://issues.apache.org/jira/browse/KAFKA-3963
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Maysam Yabandeh
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> The controller takes messages from a queue and send it to the designated 
> broker. If the controller times out on receiving a response from the broker 
> (30s) it closes the connection and retries again after a backoff period, 
> however it does not return the message back to the queue. As a result the 
> retry will start with the next message and the previous message might have 
> never been received by the broker.
> {code}
> val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
> ...
>   try {
> ...
>   clientResponse = 
> networkClient.blockingSendAndReceive(clientRequest)(time)
> ...
> }
>   } catch {
> case e: Throwable => // if the send was not successful, reconnect 
> to broker and resend the message
>   warn(("Controller %d epoch %d fails to send request %s to 
> broker %s. " +
> "Reconnecting to broker.").format(controllerId, 
> controllerContext.epoch,
>   request.toString, brokerNode.toString()), e)
>   networkClient.close(brokerNode.idString)
> ...
>   }
> {code}
> This could violates the semantics that developers had assumed when writing 
> controller-broker protocol. For example, the controller code sends metadata 
> updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly 
> joined broker for the first time. 
> {code}
>   def onBrokerStartup(newBrokers: Seq[Int]) {
> info("New broker startup callback for 
> %s".format(newBrokers.mkString(",")))
> val newBrokersSet = newBrokers.toSet
> // send update metadata request to all live and shutting down brokers. 
> Old brokers will get to know of the new
> // broker via this update.
> // In cases of controlled shutdown leaders will not be elected when a new 
> broker comes up. So at least in the
> // common controlled shutdown case, the metadata will reach the new 
> brokers faster
> 
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
> // the very first thing to do when a new broker comes up is send it the 
> entire list of partitions that it is
> // supposed to host. Based on that the broker starts the high watermark 
> threads for the input list of partitions
> val allReplicasOnNewBrokers = 
> controllerContext.replicasOnBrokers(newBrokersSet)
> replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
> OnlineReplica)
> {code}
> This is important because without the metadata cached in the broker the 
> LeaderAndIsrRequests that ask the broker to become a follower would fail 
> since there is no metadata for leader of the partition.
> {code}
> metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
>   // Only change partition state when the leader is available
>   case Some(leaderBroker) =>
> ...
>   case None =>
> // The leader broker should always be present in the metadata 
> cache.
> // If not, we should record the error message and abort the 
> transition process for this partition
> stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest 
> with correlation id %d from controller" +
>   " %d epoch %d for partition [%s,%d] but cannot become follower 
> since the new leader %d is unavailable.")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Ismael Juma
I think it's important to distinguish the use cases of defining new stores
(somewhat rare) versus using the `store` method (very common). The strategy
employed here is a common way to use generics to ensure type safety for the
latter case. In the former case, there are all sorts of weird things one
could do to defeat the type system, but spending a bit more effort to get
it right so that the common case is safer and more pleasant is worth it, in
my opinion.

Ismael

On Thu, Jul 14, 2016 at 12:23 AM, Damian Guy  wrote:

> Yes, you get compile time errors
>
> On Wed, 13 Jul 2016 at 16:22 Damian Guy  wrote:
>
> > You wont get a runtime error as you wouldn't find a store of that type.
> > The API would return null
> >
> > On Wed, 13 Jul 2016 at 16:22 Jay Kreps  wrote:
> >
> >> But if "my-store" is not of type MyStoreType don't you still get a run
> >> time
> >> error that in effect is the same as the class cast would be? Basically
> the
> >> question I'm asking is whether this added complexity is actually moving
> >> runtime errors to compile time errors.
> >>
> >> -Jay
> >>
>


[jira] [Created] (KAFKA-3963) Missing messages from the controller to brokers

2016-07-13 Thread Maysam Yabandeh (JIRA)
Maysam Yabandeh created KAFKA-3963:
--

 Summary: Missing messages from the controller to brokers
 Key: KAFKA-3963
 URL: https://issues.apache.org/jira/browse/KAFKA-3963
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Maysam Yabandeh
Priority: Minor


The controller takes messages from a queue and send it to the designated 
broker. If the controller times out on receiving a response from the broker 
(30s) it closes the connection and retries again after a backoff period, 
however it does not return the message back to the queue. As a result the retry 
will start with the next message and the previous message might have never been 
received by the broker.
{code}
val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
...
  try {
...
  clientResponse = 
networkClient.blockingSendAndReceive(clientRequest)(time)
...
}
  } catch {
case e: Throwable => // if the send was not successful, reconnect 
to broker and resend the message
  warn(("Controller %d epoch %d fails to send request %s to broker 
%s. " +
"Reconnecting to broker.").format(controllerId, 
controllerContext.epoch,
  request.toString, brokerNode.toString()), e)
  networkClient.close(brokerNode.idString)
...
  }
{code}

This could violates the semantics that developers had assumed when writing 
controller-broker protocol. For example, the controller code sends metadata 
updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly 
joined broker for the first time. 
{code}
  def onBrokerStartup(newBrokers: Seq[Int]) {
info("New broker startup callback for %s".format(newBrokers.mkString(",")))
val newBrokersSet = newBrokers.toSet
// send update metadata request to all live and shutting down brokers. Old 
brokers will get to know of the new
// broker via this update.
// In cases of controlled shutdown leaders will not be elected when a new 
broker comes up. So at least in the
// common controlled shutdown case, the metadata will reach the new brokers 
faster

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
// the very first thing to do when a new broker comes up is send it the 
entire list of partitions that it is
// supposed to host. Based on that the broker starts the high watermark 
threads for the input list of partitions
val allReplicasOnNewBrokers = 
controllerContext.replicasOnBrokers(newBrokersSet)
replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
OnlineReplica)
{code}
This is important because without the metadata cached in the broker the 
LeaderAndIsrRequests that ask the broker to become a follower would fail since 
there is no metadata for leader of the partition.
{code}
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
  // Only change partition state when the leader is available
  case Some(leaderBroker) =>
...
  case None =>
// The leader broker should always be present in the metadata cache.
// If not, we should record the error message and abort the 
transition process for this partition
stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest 
with correlation id %d from controller" +
  " %d epoch %d for partition [%s,%d] but cannot become follower 
since the new leader %d is unavailable.")
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Damian Guy
Yes, you get compile time errors

On Wed, 13 Jul 2016 at 16:22 Damian Guy  wrote:

> You wont get a runtime error as you wouldn't find a store of that type.
> The API would return null
>
> On Wed, 13 Jul 2016 at 16:22 Jay Kreps  wrote:
>
>> But if "my-store" is not of type MyStoreType don't you still get a run
>> time
>> error that in effect is the same as the class cast would be? Basically the
>> question I'm asking is whether this added complexity is actually moving
>> runtime errors to compile time errors.
>>
>> -Jay
>>
>> On Wed, Jul 13, 2016 at 4:16 PM, Damian Guy  wrote:
>>
>> > You create your custom Store, i.e,:
>> >
>> > /**
>> >  * An interface your custom store provides
>> >  * @param 
>> >  * @param 
>> >  */
>> > interface MyStoreType {
>> > V get(K key);
>> > void put(K key, V value);
>> > }
>> >
>> > /**
>> >  * Implement your store
>> >  * @param 
>> >  * @param 
>> >  */
>> > public class MyStoreImpl implements StateStore, MyStoreType {
>> > // implementation of the store goes here
>> > }
>> >
>> >
>> > Provide an implementation of QueryableStoreType to find stores that
>> match
>> > your Custom store:
>> >
>> > /**
>> >  * Implement QueryableStoreType to find stores that match your Custom
>> Store
>> >  * @param 
>> >  * @param 
>> >  */
>> >
>> > public class MyQueryableType implements
>> > QueryableStoreType>{
>> > @Override
>> > public boolean accepts(final StateStore stateStore) {
>> > return stateStore instanceof MyQueryableType;
>> > }
>> >
>> > @Override
>> > public MyStoreType create(final StateStoreProvider
>> > storeProvider, final String storeName) {
>> > return new MyCompositeStore<>(storeName, storeProvider);
>> > }
>> > }
>> >
>> >
>> > Create a composite type to wrap the potentially many underlying
>> instances
>> > of the store, i.e, there will be one per partition
>> >
>> > /**
>> >  * Provide a wrapper over the underlying store instances.
>> >  */
>> > public class MyCompositeStore implements MyStoreType {
>> > private final String storeName;
>> > private final StateStoreProvider provider;
>> >
>> > public MyCompositeStore(final String storeName, final
>> > StateStoreProvider provider) {
>> > this.storeName = storeName;
>> > this.provider = provider;
>> > }
>> >
>> > @Override
>> > public V get(final K key) {
>> > final List> stores =
>> > provider.getStores(storeName, new MyQueryableType());
>> > // iterate over stores looking for key
>> > }
>> >
>> > @Override
>> > public void put(final K key, V value) {
>> >
>> > }
>> > }
>> >
>> >
>> > Lookup your new store from KafkaStreams:
>> >
>> > final MyStoreType store = kafkaStreams.store("my-store", new
>> > MyQueryableType<>());
>> >
>> >
>> > So we get type safety and we can constrain the interfaces returned to
>> Read
>> > Only versions (which is what we are doing for KeyValue and Window
>> Stores)
>> >
>> > HTH,
>> > Damian
>> >
>> > On Wed, 13 Jul 2016 at 15:30 Jay Kreps  wrote:
>> >
>> > > But to avoid the cast you introduce a bunch of magic that doesn't
>> really
>> > > bring type safety, right? Or possibly I'm misunderstanding, how do I
>> plug
>> > > in a new store type and get access to it? Can you give the steps for
>> > that?
>> > >
>> > > -Jay
>> > >
>> > > On Wed, Jul 13, 2016 at 10:47 AM, Guozhang Wang 
>> > > wrote:
>> > >
>> > > > Personally I think the additional complexity of the introduced "
>> > > > QueryableStoreType" interface is still acceptable from a user's
>> point
>> > of
>> > > > view: this is the only interface we are exposing to users, and other
>> > > > wrappers are all internal classes.
>> > > >
>> > > > Regarding "QueryableStoreTypes", maybe we can consider declaring its
>> > > > "QueryableStoreTypeMatcher" as private instead of public, since
>> > > > "QueryableStoreTypes" is just used as a convenient manner for using
>> > > > library-provided types, like serialization/Serdes.java.
>> > > >
>> > > > With this the only additional interface the library is exposing is "
>> > > > QueryableStoreType", and users optionally can just use
>> > > > "QueryableStoreTypes"
>> > > > to conveniently create library-provided store types.
>> > > >
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > > On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede 
>> > > wrote:
>> > > >
>> > > > > Damian -- appreciate the example code and you convinced me. Agree
>> > that
>> > > > the
>> > > > > class approach is better and renaming to KafkaStreamsMetadata
>> along
>> > > with
>> > > > > renaming the API methods will address the issues I was referring
>> to.
>> > > > >
>> > > > > One other thing I wanted to get people's thoughts on was the way
>> we
>> > are
>> > > > > proposing to handle different store 

Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Damian Guy
You wont get a runtime error as you wouldn't find a store of that type. The
API would return null

On Wed, 13 Jul 2016 at 16:22 Jay Kreps  wrote:

> But if "my-store" is not of type MyStoreType don't you still get a run time
> error that in effect is the same as the class cast would be? Basically the
> question I'm asking is whether this added complexity is actually moving
> runtime errors to compile time errors.
>
> -Jay
>
> On Wed, Jul 13, 2016 at 4:16 PM, Damian Guy  wrote:
>
> > You create your custom Store, i.e,:
> >
> > /**
> >  * An interface your custom store provides
> >  * @param 
> >  * @param 
> >  */
> > interface MyStoreType {
> > V get(K key);
> > void put(K key, V value);
> > }
> >
> > /**
> >  * Implement your store
> >  * @param 
> >  * @param 
> >  */
> > public class MyStoreImpl implements StateStore, MyStoreType {
> > // implementation of the store goes here
> > }
> >
> >
> > Provide an implementation of QueryableStoreType to find stores that match
> > your Custom store:
> >
> > /**
> >  * Implement QueryableStoreType to find stores that match your Custom
> Store
> >  * @param 
> >  * @param 
> >  */
> >
> > public class MyQueryableType implements
> > QueryableStoreType>{
> > @Override
> > public boolean accepts(final StateStore stateStore) {
> > return stateStore instanceof MyQueryableType;
> > }
> >
> > @Override
> > public MyStoreType create(final StateStoreProvider
> > storeProvider, final String storeName) {
> > return new MyCompositeStore<>(storeName, storeProvider);
> > }
> > }
> >
> >
> > Create a composite type to wrap the potentially many underlying instances
> > of the store, i.e, there will be one per partition
> >
> > /**
> >  * Provide a wrapper over the underlying store instances.
> >  */
> > public class MyCompositeStore implements MyStoreType {
> > private final String storeName;
> > private final StateStoreProvider provider;
> >
> > public MyCompositeStore(final String storeName, final
> > StateStoreProvider provider) {
> > this.storeName = storeName;
> > this.provider = provider;
> > }
> >
> > @Override
> > public V get(final K key) {
> > final List> stores =
> > provider.getStores(storeName, new MyQueryableType());
> > // iterate over stores looking for key
> > }
> >
> > @Override
> > public void put(final K key, V value) {
> >
> > }
> > }
> >
> >
> > Lookup your new store from KafkaStreams:
> >
> > final MyStoreType store = kafkaStreams.store("my-store", new
> > MyQueryableType<>());
> >
> >
> > So we get type safety and we can constrain the interfaces returned to
> Read
> > Only versions (which is what we are doing for KeyValue and Window Stores)
> >
> > HTH,
> > Damian
> >
> > On Wed, 13 Jul 2016 at 15:30 Jay Kreps  wrote:
> >
> > > But to avoid the cast you introduce a bunch of magic that doesn't
> really
> > > bring type safety, right? Or possibly I'm misunderstanding, how do I
> plug
> > > in a new store type and get access to it? Can you give the steps for
> > that?
> > >
> > > -Jay
> > >
> > > On Wed, Jul 13, 2016 at 10:47 AM, Guozhang Wang 
> > > wrote:
> > >
> > > > Personally I think the additional complexity of the introduced "
> > > > QueryableStoreType" interface is still acceptable from a user's point
> > of
> > > > view: this is the only interface we are exposing to users, and other
> > > > wrappers are all internal classes.
> > > >
> > > > Regarding "QueryableStoreTypes", maybe we can consider declaring its
> > > > "QueryableStoreTypeMatcher" as private instead of public, since
> > > > "QueryableStoreTypes" is just used as a convenient manner for using
> > > > library-provided types, like serialization/Serdes.java.
> > > >
> > > > With this the only additional interface the library is exposing is "
> > > > QueryableStoreType", and users optionally can just use
> > > > "QueryableStoreTypes"
> > > > to conveniently create library-provided store types.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede 
> > > wrote:
> > > >
> > > > > Damian -- appreciate the example code and you convinced me. Agree
> > that
> > > > the
> > > > > class approach is better and renaming to KafkaStreamsMetadata along
> > > with
> > > > > renaming the API methods will address the issues I was referring
> to.
> > > > >
> > > > > One other thing I wanted to get people's thoughts on was the way we
> > are
> > > > > proposing to handle different store types. I am sure you guys have
> > > > thought
> > > > > about the tradeoffs of using the store wrappers and matchers (
> > > > > QueryableStoreType) vs just making users cast the returned store to
> > the
> > > > > type they would expect to use. That is simple but the obvious
> > 

Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Jay Kreps
But if "my-store" is not of type MyStoreType don't you still get a run time
error that in effect is the same as the class cast would be? Basically the
question I'm asking is whether this added complexity is actually moving
runtime errors to compile time errors.

-Jay

On Wed, Jul 13, 2016 at 4:16 PM, Damian Guy  wrote:

> You create your custom Store, i.e,:
>
> /**
>  * An interface your custom store provides
>  * @param 
>  * @param 
>  */
> interface MyStoreType {
> V get(K key);
> void put(K key, V value);
> }
>
> /**
>  * Implement your store
>  * @param 
>  * @param 
>  */
> public class MyStoreImpl implements StateStore, MyStoreType {
> // implementation of the store goes here
> }
>
>
> Provide an implementation of QueryableStoreType to find stores that match
> your Custom store:
>
> /**
>  * Implement QueryableStoreType to find stores that match your Custom Store
>  * @param 
>  * @param 
>  */
>
> public class MyQueryableType implements
> QueryableStoreType>{
> @Override
> public boolean accepts(final StateStore stateStore) {
> return stateStore instanceof MyQueryableType;
> }
>
> @Override
> public MyStoreType create(final StateStoreProvider
> storeProvider, final String storeName) {
> return new MyCompositeStore<>(storeName, storeProvider);
> }
> }
>
>
> Create a composite type to wrap the potentially many underlying instances
> of the store, i.e, there will be one per partition
>
> /**
>  * Provide a wrapper over the underlying store instances.
>  */
> public class MyCompositeStore implements MyStoreType {
> private final String storeName;
> private final StateStoreProvider provider;
>
> public MyCompositeStore(final String storeName, final
> StateStoreProvider provider) {
> this.storeName = storeName;
> this.provider = provider;
> }
>
> @Override
> public V get(final K key) {
> final List> stores =
> provider.getStores(storeName, new MyQueryableType());
> // iterate over stores looking for key
> }
>
> @Override
> public void put(final K key, V value) {
>
> }
> }
>
>
> Lookup your new store from KafkaStreams:
>
> final MyStoreType store = kafkaStreams.store("my-store", new
> MyQueryableType<>());
>
>
> So we get type safety and we can constrain the interfaces returned to Read
> Only versions (which is what we are doing for KeyValue and Window Stores)
>
> HTH,
> Damian
>
> On Wed, 13 Jul 2016 at 15:30 Jay Kreps  wrote:
>
> > But to avoid the cast you introduce a bunch of magic that doesn't really
> > bring type safety, right? Or possibly I'm misunderstanding, how do I plug
> > in a new store type and get access to it? Can you give the steps for
> that?
> >
> > -Jay
> >
> > On Wed, Jul 13, 2016 at 10:47 AM, Guozhang Wang 
> > wrote:
> >
> > > Personally I think the additional complexity of the introduced "
> > > QueryableStoreType" interface is still acceptable from a user's point
> of
> > > view: this is the only interface we are exposing to users, and other
> > > wrappers are all internal classes.
> > >
> > > Regarding "QueryableStoreTypes", maybe we can consider declaring its
> > > "QueryableStoreTypeMatcher" as private instead of public, since
> > > "QueryableStoreTypes" is just used as a convenient manner for using
> > > library-provided types, like serialization/Serdes.java.
> > >
> > > With this the only additional interface the library is exposing is "
> > > QueryableStoreType", and users optionally can just use
> > > "QueryableStoreTypes"
> > > to conveniently create library-provided store types.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede 
> > wrote:
> > >
> > > > Damian -- appreciate the example code and you convinced me. Agree
> that
> > > the
> > > > class approach is better and renaming to KafkaStreamsMetadata along
> > with
> > > > renaming the API methods will address the issues I was referring to.
> > > >
> > > > One other thing I wanted to get people's thoughts on was the way we
> are
> > > > proposing to handle different store types. I am sure you guys have
> > > thought
> > > > about the tradeoffs of using the store wrappers and matchers (
> > > > QueryableStoreType) vs just making users cast the returned store to
> the
> > > > type they would expect to use. That is simple but the obvious
> downside
> > is
> > > > that it is likely to result in exceptions for users that don't know
> > what
> > > > they are doing.
> > > >
> > > > In my experience of dealing with apps that would use queriable state,
> > it
> > > > appears to me that a majority would just use the key value store.
> > Partly
> > > > because that will suffice and partly because people might just follow
> > the
> > > > simpler examples we provide that use key-value store. For advanced
> > 

Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Damian Guy
You create your custom Store, i.e,:

/**
 * An interface your custom store provides
 * @param 
 * @param 
 */
interface MyStoreType {
V get(K key);
void put(K key, V value);
}

/**
 * Implement your store
 * @param 
 * @param 
 */
public class MyStoreImpl implements StateStore, MyStoreType {
// implementation of the store goes here
}


Provide an implementation of QueryableStoreType to find stores that match
your Custom store:

/**
 * Implement QueryableStoreType to find stores that match your Custom Store
 * @param 
 * @param 
 */

public class MyQueryableType implements
QueryableStoreType>{
@Override
public boolean accepts(final StateStore stateStore) {
return stateStore instanceof MyQueryableType;
}

@Override
public MyStoreType create(final StateStoreProvider
storeProvider, final String storeName) {
return new MyCompositeStore<>(storeName, storeProvider);
}
}


Create a composite type to wrap the potentially many underlying instances
of the store, i.e, there will be one per partition

/**
 * Provide a wrapper over the underlying store instances.
 */
public class MyCompositeStore implements MyStoreType {
private final String storeName;
private final StateStoreProvider provider;

public MyCompositeStore(final String storeName, final
StateStoreProvider provider) {
this.storeName = storeName;
this.provider = provider;
}

@Override
public V get(final K key) {
final List> stores =
provider.getStores(storeName, new MyQueryableType());
// iterate over stores looking for key
}

@Override
public void put(final K key, V value) {

}
}


Lookup your new store from KafkaStreams:

final MyStoreType store = kafkaStreams.store("my-store", new
MyQueryableType<>());


So we get type safety and we can constrain the interfaces returned to Read
Only versions (which is what we are doing for KeyValue and Window Stores)

HTH,
Damian

On Wed, 13 Jul 2016 at 15:30 Jay Kreps  wrote:

> But to avoid the cast you introduce a bunch of magic that doesn't really
> bring type safety, right? Or possibly I'm misunderstanding, how do I plug
> in a new store type and get access to it? Can you give the steps for that?
>
> -Jay
>
> On Wed, Jul 13, 2016 at 10:47 AM, Guozhang Wang 
> wrote:
>
> > Personally I think the additional complexity of the introduced "
> > QueryableStoreType" interface is still acceptable from a user's point of
> > view: this is the only interface we are exposing to users, and other
> > wrappers are all internal classes.
> >
> > Regarding "QueryableStoreTypes", maybe we can consider declaring its
> > "QueryableStoreTypeMatcher" as private instead of public, since
> > "QueryableStoreTypes" is just used as a convenient manner for using
> > library-provided types, like serialization/Serdes.java.
> >
> > With this the only additional interface the library is exposing is "
> > QueryableStoreType", and users optionally can just use
> > "QueryableStoreTypes"
> > to conveniently create library-provided store types.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede 
> wrote:
> >
> > > Damian -- appreciate the example code and you convinced me. Agree that
> > the
> > > class approach is better and renaming to KafkaStreamsMetadata along
> with
> > > renaming the API methods will address the issues I was referring to.
> > >
> > > One other thing I wanted to get people's thoughts on was the way we are
> > > proposing to handle different store types. I am sure you guys have
> > thought
> > > about the tradeoffs of using the store wrappers and matchers (
> > > QueryableStoreType) vs just making users cast the returned store to the
> > > type they would expect to use. That is simple but the obvious downside
> is
> > > that it is likely to result in exceptions for users that don't know
> what
> > > they are doing.
> > >
> > > In my experience of dealing with apps that would use queriable state,
> it
> > > appears to me that a majority would just use the key value store.
> Partly
> > > because that will suffice and partly because people might just follow
> the
> > > simpler examples we provide that use key-value store. For advanced
> users,
> > > they will be aware of the reason they want to use the windowed store
> and
> > > will know how to cast it. The advantage of the current approach is that
> > it
> > > is likely more robust and general but involves introduces more
> interfaces
> > > and wrapper code.
> > >
> > > I tend to prefer simplicity to optimize for the general case, but
> curious
> > > to get people's thoughts on this as well.
> > >
> > > On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski 
> wrote:
> > >
> > > > IMO, that makes the most sense.
> > > >
> > > > > On Jul 12, 2016, at 5:11 PM, Ismael Juma 
> 

[jira] [Commented] (KAFKA-3948) Invalid broker port in Zookeeper when SSL is enabled

2016-07-13 Thread Bharat Viswanadham (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375965#comment-15375965
 ] 

Bharat Viswanadham commented on KAFKA-3948:
---

I will look in to this issue, to provide PR.

> Invalid broker port in Zookeeper when SSL is enabled
> 
>
> Key: KAFKA-3948
> URL: https://issues.apache.org/jira/browse/KAFKA-3948
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.9.0.1
>Reporter: Gérald Quintana
>Assignee: Bharat Viswanadham
>
> With broker config
> {code}
> listeners=SSL://:9093,PLAINTEXT://:9092
> port=9093
> {code}
> gives in Zookeeper /brokers/ids/1
> {code}
> {"jmx_port":,"timestamp":"1468249905473","endpoints":["SSL://kafka1:9093","PLAINTEXT://kafka1:9092"],"host":"kafka1","version":2,"port":9092}
> {code}
> Notice that port 9092 not 9093
> Then, different scenario, with config:
> {code}
> listeners=SSL://:9093
> port=9093
> {code}
> gives in Zookeeper /brokers/ids/1
> {code}
> {"jmx_port":,"timestamp":"1468250372974","endpoints":["SSL://kafka1:9093"],"host":null,"version":2,"port":-1}
> {code}
> Now host is null and port is -1
> Setting advertised.port doesn't help



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3948) Invalid broker port in Zookeeper when SSL is enabled

2016-07-13 Thread Bharat Viswanadham (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bharat Viswanadham reassigned KAFKA-3948:
-

Assignee: Bharat Viswanadham

> Invalid broker port in Zookeeper when SSL is enabled
> 
>
> Key: KAFKA-3948
> URL: https://issues.apache.org/jira/browse/KAFKA-3948
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.9.0.1
>Reporter: Gérald Quintana
>Assignee: Bharat Viswanadham
>
> With broker config
> {code}
> listeners=SSL://:9093,PLAINTEXT://:9092
> port=9093
> {code}
> gives in Zookeeper /brokers/ids/1
> {code}
> {"jmx_port":,"timestamp":"1468249905473","endpoints":["SSL://kafka1:9093","PLAINTEXT://kafka1:9092"],"host":"kafka1","version":2,"port":9092}
> {code}
> Notice that port 9092 not 9093
> Then, different scenario, with config:
> {code}
> listeners=SSL://:9093
> port=9093
> {code}
> gives in Zookeeper /brokers/ids/1
> {code}
> {"jmx_port":,"timestamp":"1468250372974","endpoints":["SSL://kafka1:9093"],"host":null,"version":2,"port":-1}
> {code}
> Now host is null and port is -1
> Setting advertised.port doesn't help



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3962) ConfigDef support for resource-specific configuration

2016-07-13 Thread Shikhar Bhushan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shikhar Bhushan updated KAFKA-3962:
---
Description: 
It often comes up with connectors that you want some piece of configuration 
that should be overridable at the topic-level, table-level, etc.

The ConfigDef API should allow for defining these resource-overridable config 
properties and we should have getter variants that accept a resource argument, 
and return the more specific config value (falling back to the default).

There are a couple of possible ways to allow for this:

1. Support for map-style config properties "resource1:v1,resource2:v2". There 
are escaping considerations to think through here. Also, how should the user 
override fallback/default values -- perhaps {{*}} as a special resource?

2. Templatized configs -- so you would define {{$resource.some.property}}. The 
default value is more naturally overridable here, by the user setting 
{{some.property}} without the {{$resource}} prefix.

  was:
It often comes up with connectors that you want some piece of configuration 
that should be overridable at the topic-level, table-level, etc.

There are a couple of possible ways to allow for this:

1. Support for map-style config properties "k1:v1,k2:v2". There are escaping 
considerations to think through here. Also, how should the user override 
fallback/default values -- perhaps {{*}} as a special resource?

2. Templatized configs -- so we can define {{$resource.some.property}} with the 
ConfigDef API, and have getter variants that take the resource argument. The 
default value is more naturally overridable here, by the user setting 
{{some.property}}.


> ConfigDef support for resource-specific configuration
> -
>
> Key: KAFKA-3962
> URL: https://issues.apache.org/jira/browse/KAFKA-3962
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Shikhar Bhushan
>
> It often comes up with connectors that you want some piece of configuration 
> that should be overridable at the topic-level, table-level, etc.
> The ConfigDef API should allow for defining these resource-overridable config 
> properties and we should have getter variants that accept a resource 
> argument, and return the more specific config value (falling back to the 
> default).
> There are a couple of possible ways to allow for this:
> 1. Support for map-style config properties "resource1:v1,resource2:v2". There 
> are escaping considerations to think through here. Also, how should the user 
> override fallback/default values -- perhaps {{*}} as a special resource?
> 2. Templatized configs -- so you would define {{$resource.some.property}}. 
> The default value is more naturally overridable here, by the user setting 
> {{some.property}} without the {{$resource}} prefix.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3962) ConfigDef support for resource-specific configuration

2016-07-13 Thread Shikhar Bhushan (JIRA)
Shikhar Bhushan created KAFKA-3962:
--

 Summary: ConfigDef support for resource-specific configuration
 Key: KAFKA-3962
 URL: https://issues.apache.org/jira/browse/KAFKA-3962
 Project: Kafka
  Issue Type: Improvement
Reporter: Shikhar Bhushan


It often comes up with connectors that you want some piece of configuration 
that should be overridable at the topic-level, table-level, etc.

There are a couple of possible ways to allow for this:

1. Support for map-style config properties "k1:v1,k2:v2". There are escaping 
considerations to think through here. Also, how should the user override 
fallback/default values -- perhaps {{*}} as a special resource?

2. Templatized configs -- so we can define {{$resource.some.property}} with the 
ConfigDef API, and have getter variants that take the resource argument. The 
default value is more naturally overridable here, by the user setting 
{{some.property}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3859) Consumer group is stuck in rebalancing status

2016-07-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3859 started by Vahid Hashemian.
--
> Consumer group is stuck in rebalancing status
> -
>
> Key: KAFKA-3859
> URL: https://issues.apache.org/jira/browse/KAFKA-3859
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> * I have a topic (1 partition) and a producer and new consumer that produce 
> to and consumer from the topic.
> * The consumer belongs to group {{A}}.
> * I kill the consumer (whether it has consumed any messages or not does not 
> seem to be relevant).
> * After a short period when group status is processed and finalized, I run 
> the consumer-group describe command ({{kafka-consumer-groups.sh 
> --bootstrap-server localhost:9092 --new-consumer --describe --group A}}).
> * The response I receive is {{Consumer group `A` is rebalancing.}}
> * I keep trying the command but the response does not change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3859) Consumer group is stuck in rebalancing status

2016-07-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-3859:
--

Assignee: Vahid Hashemian

> Consumer group is stuck in rebalancing status
> -
>
> Key: KAFKA-3859
> URL: https://issues.apache.org/jira/browse/KAFKA-3859
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> * I have a topic (1 partition) and a producer and new consumer that produce 
> to and consumer from the topic.
> * The consumer belongs to group {{A}}.
> * I kill the consumer (whether it has consumed any messages or not does not 
> seem to be relevant).
> * After a short period when group status is processed and finalized, I run 
> the consumer-group describe command ({{kafka-consumer-groups.sh 
> --bootstrap-server localhost:9092 --new-consumer --describe --group A}}).
> * The response I receive is {{Consumer group `A` is rebalancing.}}
> * I keep trying the command but the response does not change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1621: MINOR: Added simple streams benchmark to system te...

2016-07-13 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/1621

MINOR: Added simple streams benchmark to system tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka 
simple-benchmark-streams-system-tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1621.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1621


commit 86c73bcebb3117a57a645bf42d710186c7eb1cab
Author: Eno Thereska 
Date:   2016-07-13T22:33:37Z

Added simple streams benchmark to system tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Jay Kreps
But to avoid the cast you introduce a bunch of magic that doesn't really
bring type safety, right? Or possibly I'm misunderstanding, how do I plug
in a new store type and get access to it? Can you give the steps for that?

-Jay

On Wed, Jul 13, 2016 at 10:47 AM, Guozhang Wang  wrote:

> Personally I think the additional complexity of the introduced "
> QueryableStoreType" interface is still acceptable from a user's point of
> view: this is the only interface we are exposing to users, and other
> wrappers are all internal classes.
>
> Regarding "QueryableStoreTypes", maybe we can consider declaring its
> "QueryableStoreTypeMatcher" as private instead of public, since
> "QueryableStoreTypes" is just used as a convenient manner for using
> library-provided types, like serialization/Serdes.java.
>
> With this the only additional interface the library is exposing is "
> QueryableStoreType", and users optionally can just use
> "QueryableStoreTypes"
> to conveniently create library-provided store types.
>
>
> Guozhang
>
>
> On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede  wrote:
>
> > Damian -- appreciate the example code and you convinced me. Agree that
> the
> > class approach is better and renaming to KafkaStreamsMetadata along with
> > renaming the API methods will address the issues I was referring to.
> >
> > One other thing I wanted to get people's thoughts on was the way we are
> > proposing to handle different store types. I am sure you guys have
> thought
> > about the tradeoffs of using the store wrappers and matchers (
> > QueryableStoreType) vs just making users cast the returned store to the
> > type they would expect to use. That is simple but the obvious downside is
> > that it is likely to result in exceptions for users that don't know what
> > they are doing.
> >
> > In my experience of dealing with apps that would use queriable state, it
> > appears to me that a majority would just use the key value store. Partly
> > because that will suffice and partly because people might just follow the
> > simpler examples we provide that use key-value store. For advanced users,
> > they will be aware of the reason they want to use the windowed store and
> > will know how to cast it. The advantage of the current approach is that
> it
> > is likely more robust and general but involves introduces more interfaces
> > and wrapper code.
> >
> > I tend to prefer simplicity to optimize for the general case, but curious
> > to get people's thoughts on this as well.
> >
> > On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski  wrote:
> >
> > > IMO, that makes the most sense.
> > >
> > > > On Jul 12, 2016, at 5:11 PM, Ismael Juma  wrote:
> > > >
> > > > Hi Damian,
> > > >
> > > > How about StreamsMetadata instead? The general naming pattern seems
> to
> > > > avoid the `Kafka` prefix for everything outside of `KafkaStreams`
> > itself.
> > > >
> > > > Ismael
> > > >
> > > > On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy 
> > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I agree with point 1. application.server is a better name for the
> > config
> > > >> (we'll change this). However, on point 2 I think we should stick
> > mostly
> > > >> with what we already have. I've tried both ways of doing this when
> > > working
> > > >> on the JIRA and building examples and I find the current approach
> more
> > > >> intuitive and easier to use than the Map based approach.
> > > >> However, there is probably a naming issue. We should rename
> > > >> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very
> > simple,
> > > >> but provides all the information a developer needs to be able to
> find
> > > the
> > > >> instance(s) of a Streams application that a particular store is
> > running
> > > on,
> > > >> i.e.,
> > > >>
> > > >> public class KafkStreamsMetadata {
> > > >>private final HostInfo hostInfo;
> > > >>private final Set stateStoreNames;
> > > >>private final Set topicPartitions;
> > > >>
> > > >>
> > > >> So using the API to route to a new host is fairly simple,
> particularly
> > > in
> > > >> the case when you want to find the host for a particular key, i.e.,
> > > >>
> > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > >> final KafkaStreamsMetadata streamsMetadata =
> > > >> kafkaStreams.instanceWithKey("word-count", "hello",
> > > >> Serdes.String().serializer());
> > > >> http.get("http://; + streamsMetadata.host() + ":" +
> > > >> streamsMetadata.port() + "/get/word-count/hello");
> > > >>
> > > >>
> > > >> And if you want to do a scatter gather approach:
> > > >>
> > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > >> final Collection kafkaStreamsMetadatas =
> > > >> kafkaStreams.allInstancesWithStore("word-count");
> > > >> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > > >>http.get("http://; + streamsMetadata.host() + ":" +
> > > >> 

[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2016-07-13 Thread Tyler Bischel (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375878#comment-15375878
 ] 

Tyler Bischel commented on KAFKA-2729:
--

We are also seeing this issue in 0.10.0.0 pretty much daily right now.
{code}
[2016-07-13 21:30:50,170]  1292384 [kafka-scheduler-0] INFO  
kafka.cluster.Partition  - Partition [events,580] on broker 10432234: Cached 
zkVersion [1267] not equal to that in zookeeper, skip updating ISR
{code}

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Ismael Juma
I agree too. Casting also makes it harder to rely on static analysis tools
to detect unsafe code usage.

Ismael

On Wed, Jul 13, 2016 at 7:37 PM, Damian Guy  wrote:

> I'm in agreement with Gouzhang. Further, from a "Developer Experience" PoV
> IMO we should avoid forcing developers to cast - it doesn't make for nice
> and easy to use API, introduces possible runtime errors due to invalid
> casts, developers need to know what they are casting to.
>
> On Wed, 13 Jul 2016 at 10:47 Guozhang Wang  wrote:
>
> > Personally I think the additional complexity of the introduced "
> > QueryableStoreType" interface is still acceptable from a user's point of
> > view: this is the only interface we are exposing to users, and other
> > wrappers are all internal classes.
> >
> > Regarding "QueryableStoreTypes", maybe we can consider declaring its
> > "QueryableStoreTypeMatcher" as private instead of public, since
> > "QueryableStoreTypes" is just used as a convenient manner for using
> > library-provided types, like serialization/Serdes.java.
> >
> > With this the only additional interface the library is exposing is "
> > QueryableStoreType", and users optionally can just use
> > "QueryableStoreTypes"
> > to conveniently create library-provided store types.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede 
> wrote:
> >
> > > Damian -- appreciate the example code and you convinced me. Agree that
> > the
> > > class approach is better and renaming to KafkaStreamsMetadata along
> with
> > > renaming the API methods will address the issues I was referring to.
> > >
> > > One other thing I wanted to get people's thoughts on was the way we are
> > > proposing to handle different store types. I am sure you guys have
> > thought
> > > about the tradeoffs of using the store wrappers and matchers (
> > > QueryableStoreType) vs just making users cast the returned store to the
> > > type they would expect to use. That is simple but the obvious downside
> is
> > > that it is likely to result in exceptions for users that don't know
> what
> > > they are doing.
> > >
> > > In my experience of dealing with apps that would use queriable state,
> it
> > > appears to me that a majority would just use the key value store.
> Partly
> > > because that will suffice and partly because people might just follow
> the
> > > simpler examples we provide that use key-value store. For advanced
> users,
> > > they will be aware of the reason they want to use the windowed store
> and
> > > will know how to cast it. The advantage of the current approach is that
> > it
> > > is likely more robust and general but involves introduces more
> interfaces
> > > and wrapper code.
> > >
> > > I tend to prefer simplicity to optimize for the general case, but
> curious
> > > to get people's thoughts on this as well.
> > >
> > > On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski 
> wrote:
> > >
> > > > IMO, that makes the most sense.
> > > >
> > > > > On Jul 12, 2016, at 5:11 PM, Ismael Juma 
> wrote:
> > > > >
> > > > > Hi Damian,
> > > > >
> > > > > How about StreamsMetadata instead? The general naming pattern seems
> > to
> > > > > avoid the `Kafka` prefix for everything outside of `KafkaStreams`
> > > itself.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy 
> > > > wrote:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> I agree with point 1. application.server is a better name for the
> > > config
> > > > >> (we'll change this). However, on point 2 I think we should stick
> > > mostly
> > > > >> with what we already have. I've tried both ways of doing this when
> > > > working
> > > > >> on the JIRA and building examples and I find the current approach
> > more
> > > > >> intuitive and easier to use than the Map based approach.
> > > > >> However, there is probably a naming issue. We should rename
> > > > >> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very
> > > simple,
> > > > >> but provides all the information a developer needs to be able to
> > find
> > > > the
> > > > >> instance(s) of a Streams application that a particular store is
> > > running
> > > > on,
> > > > >> i.e.,
> > > > >>
> > > > >> public class KafkStreamsMetadata {
> > > > >>private final HostInfo hostInfo;
> > > > >>private final Set stateStoreNames;
> > > > >>private final Set topicPartitions;
> > > > >>
> > > > >>
> > > > >> So using the API to route to a new host is fairly simple,
> > particularly
> > > > in
> > > > >> the case when you want to find the host for a particular key,
> i.e.,
> > > > >>
> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > >> final KafkaStreamsMetadata streamsMetadata =
> > > > >> kafkaStreams.instanceWithKey("word-count", "hello",
> > > > >> Serdes.String().serializer());
> > > > >> http.get("http://; + 

[jira] [Commented] (KAFKA-3940) Log should check the return value of dir.mkdirs()

2016-07-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375774#comment-15375774
 ] 

Ismael Juma commented on KAFKA-3940:


Yes, I'd prefer if we handled this across the codebase instead of the single 
example. Also, if someone assigns a ticket to themselves, we generally 
recommend that other people don't submit PRs before discussing in the JIRA (to 
avoid duplicated work).

> Log should check the return value of dir.mkdirs()
> -
>
> Key: KAFKA-3940
> URL: https://issues.apache.org/jira/browse/KAFKA-3940
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Ishita Mandhan
>  Labels: newbie
>
> In Log.loadSegments(), we call dir.mkdirs() w/o checking the return value and 
> just assume the directory will exist after the call. However, if the 
> directory can't be created (e.g. due to no space), we will hit 
> NullPointerException in the next statement, which will be confusing.
>for(file <- dir.listFiles if file.isFile) {



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3940) Log should check the return value of dir.mkdirs()

2016-07-13 Thread Ishita Mandhan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375692#comment-15375692
 ] 

Ishita Mandhan commented on KAFKA-3940:
---

Hi Jim, I got a notification of your PR because I had this bug assigned to me 
and I was working on it myself. I reviewed your PR and I think the fix would be 
more involved and more occurrences of dir.mkdirs() and File.delete could be 
addressed as discussed above. What are your thoughts? Do you mind collaborating 
on this patch set?

> Log should check the return value of dir.mkdirs()
> -
>
> Key: KAFKA-3940
> URL: https://issues.apache.org/jira/browse/KAFKA-3940
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Ishita Mandhan
>  Labels: newbie
>
> In Log.loadSegments(), we call dir.mkdirs() w/o checking the return value and 
> just assume the directory will exist after the call. However, if the 
> directory can't be created (e.g. due to no space), we will hit 
> NullPointerException in the next statement, which will be confusing.
>for(file <- dir.listFiles if file.isFile) {



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-3957) consumer timeout not being respected when kafka broker is not available

2016-07-13 Thread Vincent Fumo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent Fumo closed KAFKA-3957.
---

> consumer timeout not being respected when kafka broker is not available
> ---
>
> Key: KAFKA-3957
> URL: https://issues.apache.org/jira/browse/KAFKA-3957
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Vincent Fumo
>Priority: Minor
>
> KafkaConsumer v0.9::
> I have a consumer set up with session.timeout.ms set to 30s. I make a call 
> like
> consumer.poll(1)
> but if the kafka broker is down, that call will hang indefinitely.
> Digging into the code it seems that the timeout isn't respected:
> KafkaConsumer calls out to pollOnce() as seen below::
>private Map>> pollOnce(long 
> timeout) {
>// TODO: Sub-requests should take into account the poll timeout 
> (KAFKA-1894)
>coordinator.ensureCoordinatorKnown();
>// ensure we have partitions assigned if we expect to
>if (subscriptions.partitionsAutoAssigned())
>coordinator.ensurePartitionAssignment();
>// fetch positions if we have partitions we're subscribed to that we
>// don't know the offset for
>if (!subscriptions.hasAllFetchPositions())
>updateFetchPositions(this.subscriptions.missingFetchPositions());
>// init any new fetches (won't resend pending fetches)
>Cluster cluster = this.metadata.fetch();
>Map>> records = 
> fetcher.fetchedRecords();
>// if data is available already, e.g. from a previous network client 
> poll() call to commit,
>// then just return it immediately
>if (!records.isEmpty()) {
>return records;
>}
>fetcher.initFetches(cluster);
>client.poll(timeout);
>return fetcher.fetchedRecords();
>}
> and we see that we stick on the call to coordinator.ensureCoordinatorKnown();
> AbstractCoordinator ::
>public void ensureCoordinatorKnown() {
>while (coordinatorUnknown()) {
>RequestFuture future = sendGroupMetadataRequest();
>client.poll(future);
>if (future.failed()) {
>if (future.isRetriable())
>client.awaitMetadataUpdate();
>else
>throw future.exception();
>}
>}
>}
> in this case the Future fails (since the broker is down) and then a call to 
> client.awaitMetadataUpdate() is made which in the case of the 
> ConsumerNetworkClient will block forever :
>public void awaitMetadataUpdate() {
>int version = this.metadata.requestUpdate();
>do {
>poll(Long.MAX_VALUE);
>} while (this.metadata.version() == version);
>}
> I feel that this is a bug. When you set a timeout on a call to a blocking 
> method, that timeout should be respected and an exception should be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3957) consumer timeout not being respected when kafka broker is not available

2016-07-13 Thread Vincent Fumo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent Fumo resolved KAFKA-3957.
-
Resolution: Duplicate

> consumer timeout not being respected when kafka broker is not available
> ---
>
> Key: KAFKA-3957
> URL: https://issues.apache.org/jira/browse/KAFKA-3957
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Vincent Fumo
>Priority: Minor
>
> KafkaConsumer v0.9::
> I have a consumer set up with session.timeout.ms set to 30s. I make a call 
> like
> consumer.poll(1)
> but if the kafka broker is down, that call will hang indefinitely.
> Digging into the code it seems that the timeout isn't respected:
> KafkaConsumer calls out to pollOnce() as seen below::
>private Map>> pollOnce(long 
> timeout) {
>// TODO: Sub-requests should take into account the poll timeout 
> (KAFKA-1894)
>coordinator.ensureCoordinatorKnown();
>// ensure we have partitions assigned if we expect to
>if (subscriptions.partitionsAutoAssigned())
>coordinator.ensurePartitionAssignment();
>// fetch positions if we have partitions we're subscribed to that we
>// don't know the offset for
>if (!subscriptions.hasAllFetchPositions())
>updateFetchPositions(this.subscriptions.missingFetchPositions());
>// init any new fetches (won't resend pending fetches)
>Cluster cluster = this.metadata.fetch();
>Map>> records = 
> fetcher.fetchedRecords();
>// if data is available already, e.g. from a previous network client 
> poll() call to commit,
>// then just return it immediately
>if (!records.isEmpty()) {
>return records;
>}
>fetcher.initFetches(cluster);
>client.poll(timeout);
>return fetcher.fetchedRecords();
>}
> and we see that we stick on the call to coordinator.ensureCoordinatorKnown();
> AbstractCoordinator ::
>public void ensureCoordinatorKnown() {
>while (coordinatorUnknown()) {
>RequestFuture future = sendGroupMetadataRequest();
>client.poll(future);
>if (future.failed()) {
>if (future.isRetriable())
>client.awaitMetadataUpdate();
>else
>throw future.exception();
>}
>}
>}
> in this case the Future fails (since the broker is down) and then a call to 
> client.awaitMetadataUpdate() is made which in the case of the 
> ConsumerNetworkClient will block forever :
>public void awaitMetadataUpdate() {
>int version = this.metadata.requestUpdate();
>do {
>poll(Long.MAX_VALUE);
>} while (this.metadata.version() == version);
>}
> I feel that this is a bug. When you set a timeout on a call to a blocking 
> method, that timeout should be respected and an exception should be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

2016-07-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375673#comment-15375673
 ] 

Guozhang Wang commented on KAFKA-3101:
--

This LGTM, also we want to get some information on how accurate the jamm 
library (or any such util libraries that are compatible with Apache 2.0 license 
to be included as dependency) estimate object size, i.e. whether it is safe to 
use this for limiting the cache size in bytes for flushing for memory 
management.

Thanks!

> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3957) consumer timeout not being respected when kafka broker is not available

2016-07-13 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375644#comment-15375644
 ] 

Vahid Hashemian commented on KAFKA-3957:


Thanks for confirming. You can close this one.

> consumer timeout not being respected when kafka broker is not available
> ---
>
> Key: KAFKA-3957
> URL: https://issues.apache.org/jira/browse/KAFKA-3957
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Vincent Fumo
>Priority: Minor
>
> KafkaConsumer v0.9::
> I have a consumer set up with session.timeout.ms set to 30s. I make a call 
> like
> consumer.poll(1)
> but if the kafka broker is down, that call will hang indefinitely.
> Digging into the code it seems that the timeout isn't respected:
> KafkaConsumer calls out to pollOnce() as seen below::
>private Map>> pollOnce(long 
> timeout) {
>// TODO: Sub-requests should take into account the poll timeout 
> (KAFKA-1894)
>coordinator.ensureCoordinatorKnown();
>// ensure we have partitions assigned if we expect to
>if (subscriptions.partitionsAutoAssigned())
>coordinator.ensurePartitionAssignment();
>// fetch positions if we have partitions we're subscribed to that we
>// don't know the offset for
>if (!subscriptions.hasAllFetchPositions())
>updateFetchPositions(this.subscriptions.missingFetchPositions());
>// init any new fetches (won't resend pending fetches)
>Cluster cluster = this.metadata.fetch();
>Map>> records = 
> fetcher.fetchedRecords();
>// if data is available already, e.g. from a previous network client 
> poll() call to commit,
>// then just return it immediately
>if (!records.isEmpty()) {
>return records;
>}
>fetcher.initFetches(cluster);
>client.poll(timeout);
>return fetcher.fetchedRecords();
>}
> and we see that we stick on the call to coordinator.ensureCoordinatorKnown();
> AbstractCoordinator ::
>public void ensureCoordinatorKnown() {
>while (coordinatorUnknown()) {
>RequestFuture future = sendGroupMetadataRequest();
>client.poll(future);
>if (future.failed()) {
>if (future.isRetriable())
>client.awaitMetadataUpdate();
>else
>throw future.exception();
>}
>}
>}
> in this case the Future fails (since the broker is down) and then a call to 
> client.awaitMetadataUpdate() is made which in the case of the 
> ConsumerNetworkClient will block forever :
>public void awaitMetadataUpdate() {
>int version = this.metadata.requestUpdate();
>do {
>poll(Long.MAX_VALUE);
>} while (this.metadata.version() == version);
>}
> I feel that this is a bug. When you set a timeout on a call to a blocking 
> method, that timeout should be respected and an exception should be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3957) consumer timeout not being respected when kafka broker is not available

2016-07-13 Thread Vincent Fumo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375569#comment-15375569
 ] 

Vincent Fumo commented on KAFKA-3957:
-

yes I see it. Apparently my use case is just one example of time out issues 
possible. Should I close this?

> consumer timeout not being respected when kafka broker is not available
> ---
>
> Key: KAFKA-3957
> URL: https://issues.apache.org/jira/browse/KAFKA-3957
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Vincent Fumo
>Priority: Minor
>
> KafkaConsumer v0.9::
> I have a consumer set up with session.timeout.ms set to 30s. I make a call 
> like
> consumer.poll(1)
> but if the kafka broker is down, that call will hang indefinitely.
> Digging into the code it seems that the timeout isn't respected:
> KafkaConsumer calls out to pollOnce() as seen below::
>private Map>> pollOnce(long 
> timeout) {
>// TODO: Sub-requests should take into account the poll timeout 
> (KAFKA-1894)
>coordinator.ensureCoordinatorKnown();
>// ensure we have partitions assigned if we expect to
>if (subscriptions.partitionsAutoAssigned())
>coordinator.ensurePartitionAssignment();
>// fetch positions if we have partitions we're subscribed to that we
>// don't know the offset for
>if (!subscriptions.hasAllFetchPositions())
>updateFetchPositions(this.subscriptions.missingFetchPositions());
>// init any new fetches (won't resend pending fetches)
>Cluster cluster = this.metadata.fetch();
>Map>> records = 
> fetcher.fetchedRecords();
>// if data is available already, e.g. from a previous network client 
> poll() call to commit,
>// then just return it immediately
>if (!records.isEmpty()) {
>return records;
>}
>fetcher.initFetches(cluster);
>client.poll(timeout);
>return fetcher.fetchedRecords();
>}
> and we see that we stick on the call to coordinator.ensureCoordinatorKnown();
> AbstractCoordinator ::
>public void ensureCoordinatorKnown() {
>while (coordinatorUnknown()) {
>RequestFuture future = sendGroupMetadataRequest();
>client.poll(future);
>if (future.failed()) {
>if (future.isRetriable())
>client.awaitMetadataUpdate();
>else
>throw future.exception();
>}
>}
>}
> in this case the Future fails (since the broker is down) and then a call to 
> client.awaitMetadataUpdate() is made which in the case of the 
> ConsumerNetworkClient will block forever :
>public void awaitMetadataUpdate() {
>int version = this.metadata.requestUpdate();
>do {
>poll(Long.MAX_VALUE);
>} while (this.metadata.version() == version);
>}
> I feel that this is a bug. When you set a timeout on a call to a blocking 
> method, that timeout should be respected and an exception should be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Jim Jagielski
+1

> On Jul 13, 2016, at 2:37 PM, Damian Guy  wrote:
> 
> I'm in agreement with Gouzhang. Further, from a "Developer Experience" PoV
> IMO we should avoid forcing developers to cast - it doesn't make for nice
> and easy to use API, introduces possible runtime errors due to invalid
> casts, developers need to know what they are casting to.
> 
> On Wed, 13 Jul 2016 at 10:47 Guozhang Wang  wrote:
> 
>> Personally I think the additional complexity of the introduced "
>> QueryableStoreType" interface is still acceptable from a user's point of
>> view: this is the only interface we are exposing to users, and other
>> wrappers are all internal classes.
>> 
>> Regarding "QueryableStoreTypes", maybe we can consider declaring its
>> "QueryableStoreTypeMatcher" as private instead of public, since
>> "QueryableStoreTypes" is just used as a convenient manner for using
>> library-provided types, like serialization/Serdes.java.
>> 
>> With this the only additional interface the library is exposing is "
>> QueryableStoreType", and users optionally can just use
>> "QueryableStoreTypes"
>> to conveniently create library-provided store types.
>> 
>> 
>> Guozhang
>> 
>> 
>> On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede  wrote:
>> 
>>> Damian -- appreciate the example code and you convinced me. Agree that
>> the
>>> class approach is better and renaming to KafkaStreamsMetadata along with
>>> renaming the API methods will address the issues I was referring to.
>>> 
>>> One other thing I wanted to get people's thoughts on was the way we are
>>> proposing to handle different store types. I am sure you guys have
>> thought
>>> about the tradeoffs of using the store wrappers and matchers (
>>> QueryableStoreType) vs just making users cast the returned store to the
>>> type they would expect to use. That is simple but the obvious downside is
>>> that it is likely to result in exceptions for users that don't know what
>>> they are doing.
>>> 
>>> In my experience of dealing with apps that would use queriable state, it
>>> appears to me that a majority would just use the key value store. Partly
>>> because that will suffice and partly because people might just follow the
>>> simpler examples we provide that use key-value store. For advanced users,
>>> they will be aware of the reason they want to use the windowed store and
>>> will know how to cast it. The advantage of the current approach is that
>> it
>>> is likely more robust and general but involves introduces more interfaces
>>> and wrapper code.
>>> 
>>> I tend to prefer simplicity to optimize for the general case, but curious
>>> to get people's thoughts on this as well.
>>> 
>>> On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski  wrote:
>>> 
 IMO, that makes the most sense.
 
> On Jul 12, 2016, at 5:11 PM, Ismael Juma  wrote:
> 
> Hi Damian,
> 
> How about StreamsMetadata instead? The general naming pattern seems
>> to
> avoid the `Kafka` prefix for everything outside of `KafkaStreams`
>>> itself.
> 
> Ismael
> 
> On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy 
 wrote:
> 
>> Hi,
>> 
>> I agree with point 1. application.server is a better name for the
>>> config
>> (we'll change this). However, on point 2 I think we should stick
>>> mostly
>> with what we already have. I've tried both ways of doing this when
 working
>> on the JIRA and building examples and I find the current approach
>> more
>> intuitive and easier to use than the Map based approach.
>> However, there is probably a naming issue. We should rename
>> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very
>>> simple,
>> but provides all the information a developer needs to be able to
>> find
 the
>> instance(s) of a Streams application that a particular store is
>>> running
 on,
>> i.e.,
>> 
>> public class KafkStreamsMetadata {
>>   private final HostInfo hostInfo;
>>   private final Set stateStoreNames;
>>   private final Set topicPartitions;
>> 
>> 
>> So using the API to route to a new host is fairly simple,
>> particularly
 in
>> the case when you want to find the host for a particular key, i.e.,
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final KafkaStreamsMetadata streamsMetadata =
>> kafkaStreams.instanceWithKey("word-count", "hello",
>> Serdes.String().serializer());
>> http.get("http://; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>> 
>> 
>> And if you want to do a scatter gather approach:
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final Collection kafkaStreamsMetadatas =
>> kafkaStreams.allInstancesWithStore("word-count");
>> for (KafkaStreamsMetadata streamsMetadata : 

[jira] [Commented] (KAFKA-2205) Generalize TopicConfigManager to handle multiple entity configs

2016-07-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375527#comment-15375527
 ] 

Jun Rao commented on KAFKA-2205:


[~thesquelched], thanks for reporting that. Do you want to file a separate jira 
to fix that?

> Generalize TopicConfigManager to handle multiple entity configs
> ---
>
> Key: KAFKA-2205
> URL: https://issues.apache.org/jira/browse/KAFKA-2205
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Aditya Auradkar
>Assignee: Aditya Auradkar
>  Labels: quotas
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2205.patch, KAFKA-2205_2015-07-01_18:38:18.patch, 
> KAFKA-2205_2015-07-07_19:12:15.patch, KAFKA-2205_2015-07-14_10:33:47.patch, 
> KAFKA-2205_2015-07-14_10:36:36.patch, KAFKA-2205_2015-07-17_11:14:26.patch, 
> KAFKA-2205_2015-07-17_11:18:31.patch, KAFKA-2205_2015-07-24_18:11:34.patch
>
>
> Acceptance Criteria:
> - TopicConfigManager should be generalized to handle Topic and Client configs 
> (and any type of config in the future). As described in KIP-21
> - Add a ConfigCommand tool to change topic and client configuration



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Damian Guy
I'm in agreement with Gouzhang. Further, from a "Developer Experience" PoV
IMO we should avoid forcing developers to cast - it doesn't make for nice
and easy to use API, introduces possible runtime errors due to invalid
casts, developers need to know what they are casting to.

On Wed, 13 Jul 2016 at 10:47 Guozhang Wang  wrote:

> Personally I think the additional complexity of the introduced "
> QueryableStoreType" interface is still acceptable from a user's point of
> view: this is the only interface we are exposing to users, and other
> wrappers are all internal classes.
>
> Regarding "QueryableStoreTypes", maybe we can consider declaring its
> "QueryableStoreTypeMatcher" as private instead of public, since
> "QueryableStoreTypes" is just used as a convenient manner for using
> library-provided types, like serialization/Serdes.java.
>
> With this the only additional interface the library is exposing is "
> QueryableStoreType", and users optionally can just use
> "QueryableStoreTypes"
> to conveniently create library-provided store types.
>
>
> Guozhang
>
>
> On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede  wrote:
>
> > Damian -- appreciate the example code and you convinced me. Agree that
> the
> > class approach is better and renaming to KafkaStreamsMetadata along with
> > renaming the API methods will address the issues I was referring to.
> >
> > One other thing I wanted to get people's thoughts on was the way we are
> > proposing to handle different store types. I am sure you guys have
> thought
> > about the tradeoffs of using the store wrappers and matchers (
> > QueryableStoreType) vs just making users cast the returned store to the
> > type they would expect to use. That is simple but the obvious downside is
> > that it is likely to result in exceptions for users that don't know what
> > they are doing.
> >
> > In my experience of dealing with apps that would use queriable state, it
> > appears to me that a majority would just use the key value store. Partly
> > because that will suffice and partly because people might just follow the
> > simpler examples we provide that use key-value store. For advanced users,
> > they will be aware of the reason they want to use the windowed store and
> > will know how to cast it. The advantage of the current approach is that
> it
> > is likely more robust and general but involves introduces more interfaces
> > and wrapper code.
> >
> > I tend to prefer simplicity to optimize for the general case, but curious
> > to get people's thoughts on this as well.
> >
> > On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski  wrote:
> >
> > > IMO, that makes the most sense.
> > >
> > > > On Jul 12, 2016, at 5:11 PM, Ismael Juma  wrote:
> > > >
> > > > Hi Damian,
> > > >
> > > > How about StreamsMetadata instead? The general naming pattern seems
> to
> > > > avoid the `Kafka` prefix for everything outside of `KafkaStreams`
> > itself.
> > > >
> > > > Ismael
> > > >
> > > > On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy 
> > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I agree with point 1. application.server is a better name for the
> > config
> > > >> (we'll change this). However, on point 2 I think we should stick
> > mostly
> > > >> with what we already have. I've tried both ways of doing this when
> > > working
> > > >> on the JIRA and building examples and I find the current approach
> more
> > > >> intuitive and easier to use than the Map based approach.
> > > >> However, there is probably a naming issue. We should rename
> > > >> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very
> > simple,
> > > >> but provides all the information a developer needs to be able to
> find
> > > the
> > > >> instance(s) of a Streams application that a particular store is
> > running
> > > on,
> > > >> i.e.,
> > > >>
> > > >> public class KafkStreamsMetadata {
> > > >>private final HostInfo hostInfo;
> > > >>private final Set stateStoreNames;
> > > >>private final Set topicPartitions;
> > > >>
> > > >>
> > > >> So using the API to route to a new host is fairly simple,
> particularly
> > > in
> > > >> the case when you want to find the host for a particular key, i.e.,
> > > >>
> > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > >> final KafkaStreamsMetadata streamsMetadata =
> > > >> kafkaStreams.instanceWithKey("word-count", "hello",
> > > >> Serdes.String().serializer());
> > > >> http.get("http://; + streamsMetadata.host() + ":" +
> > > >> streamsMetadata.port() + "/get/word-count/hello");
> > > >>
> > > >>
> > > >> And if you want to do a scatter gather approach:
> > > >>
> > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > >> final Collection kafkaStreamsMetadatas =
> > > >> kafkaStreams.allInstancesWithStore("word-count");
> > > >> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > > >>

[jira] [Updated] (KAFKA-3941) Avoid applying eviction listener in InMemoryKeyValueLoggedStore

2016-07-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3941:
-
Description: 
This is reported by [~norwood].

In {{InMemoryKeyValueLoggedStore}} we set the eviction listener while creating 
the store, which records the evicted records as "deleted" in the changelogger, 
which will then send a tombstone record to the corresponding changelog topic 
partition. However, when restoring the store, although we are using the inner 
store's putInternal call and hence by-pass the logging since it is not needed, 
this eviction listener will still call the outer store's deleted call and hence 
still sends the tombstone record, causing the restoration process to fail, as 
it is not expecting the changelog log-end-offset to increase (i.e. more 
messages are appended to it) while restoration is going on.

We should defer the listener initialization until the end of the {{init}} call 
after the restoration is completed, and also making sure the "register" call is 
made at the inner stores only.

  was:
This is reported by [~norwood].

In {{InMemoryKeyValueLoggedStore}} we set the eviction listener which records 
the evicted records as deletes in the changelog. However, when restoring the 
store this listener will then double-writes the delete record, causing the 
restoration process to fail.

We should defer the listener initialization until the end of the {{init}} call, 
instead of inside the {{supplier.get}}.


> Avoid applying eviction listener in InMemoryKeyValueLoggedStore
> ---
>
> Key: KAFKA-3941
> URL: https://issues.apache.org/jira/browse/KAFKA-3941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> This is reported by [~norwood].
> In {{InMemoryKeyValueLoggedStore}} we set the eviction listener while 
> creating the store, which records the evicted records as "deleted" in the 
> changelogger, which will then send a tombstone record to the corresponding 
> changelog topic partition. However, when restoring the store, although we are 
> using the inner store's putInternal call and hence by-pass the logging since 
> it is not needed, this eviction listener will still call the outer store's 
> deleted call and hence still sends the tombstone record, causing the 
> restoration process to fail, as it is not expecting the changelog 
> log-end-offset to increase (i.e. more messages are appended to it) while 
> restoration is going on.
> We should defer the listener initialization until the end of the {{init}} 
> call after the restoration is completed, and also making sure the "register" 
> call is made at the inner stores only.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Guozhang Wang
Personally I think the additional complexity of the introduced "
QueryableStoreType" interface is still acceptable from a user's point of
view: this is the only interface we are exposing to users, and other
wrappers are all internal classes.

Regarding "QueryableStoreTypes", maybe we can consider declaring its
"QueryableStoreTypeMatcher" as private instead of public, since
"QueryableStoreTypes" is just used as a convenient manner for using
library-provided types, like serialization/Serdes.java.

With this the only additional interface the library is exposing is "
QueryableStoreType", and users optionally can just use "QueryableStoreTypes"
to conveniently create library-provided store types.


Guozhang


On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede  wrote:

> Damian -- appreciate the example code and you convinced me. Agree that the
> class approach is better and renaming to KafkaStreamsMetadata along with
> renaming the API methods will address the issues I was referring to.
>
> One other thing I wanted to get people's thoughts on was the way we are
> proposing to handle different store types. I am sure you guys have thought
> about the tradeoffs of using the store wrappers and matchers (
> QueryableStoreType) vs just making users cast the returned store to the
> type they would expect to use. That is simple but the obvious downside is
> that it is likely to result in exceptions for users that don't know what
> they are doing.
>
> In my experience of dealing with apps that would use queriable state, it
> appears to me that a majority would just use the key value store. Partly
> because that will suffice and partly because people might just follow the
> simpler examples we provide that use key-value store. For advanced users,
> they will be aware of the reason they want to use the windowed store and
> will know how to cast it. The advantage of the current approach is that it
> is likely more robust and general but involves introduces more interfaces
> and wrapper code.
>
> I tend to prefer simplicity to optimize for the general case, but curious
> to get people's thoughts on this as well.
>
> On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski  wrote:
>
> > IMO, that makes the most sense.
> >
> > > On Jul 12, 2016, at 5:11 PM, Ismael Juma  wrote:
> > >
> > > Hi Damian,
> > >
> > > How about StreamsMetadata instead? The general naming pattern seems to
> > > avoid the `Kafka` prefix for everything outside of `KafkaStreams`
> itself.
> > >
> > > Ismael
> > >
> > > On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy 
> > wrote:
> > >
> > >> Hi,
> > >>
> > >> I agree with point 1. application.server is a better name for the
> config
> > >> (we'll change this). However, on point 2 I think we should stick
> mostly
> > >> with what we already have. I've tried both ways of doing this when
> > working
> > >> on the JIRA and building examples and I find the current approach more
> > >> intuitive and easier to use than the Map based approach.
> > >> However, there is probably a naming issue. We should rename
> > >> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very
> simple,
> > >> but provides all the information a developer needs to be able to find
> > the
> > >> instance(s) of a Streams application that a particular store is
> running
> > on,
> > >> i.e.,
> > >>
> > >> public class KafkStreamsMetadata {
> > >>private final HostInfo hostInfo;
> > >>private final Set stateStoreNames;
> > >>private final Set topicPartitions;
> > >>
> > >>
> > >> So using the API to route to a new host is fairly simple, particularly
> > in
> > >> the case when you want to find the host for a particular key, i.e.,
> > >>
> > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > >> final KafkaStreamsMetadata streamsMetadata =
> > >> kafkaStreams.instanceWithKey("word-count", "hello",
> > >> Serdes.String().serializer());
> > >> http.get("http://; + streamsMetadata.host() + ":" +
> > >> streamsMetadata.port() + "/get/word-count/hello");
> > >>
> > >>
> > >> And if you want to do a scatter gather approach:
> > >>
> > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > >> final Collection kafkaStreamsMetadatas =
> > >> kafkaStreams.allInstancesWithStore("word-count");
> > >> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > >>http.get("http://; + streamsMetadata.host() + ":" +
> > >> streamsMetadata.port() + "/get/word-count/hello");
> > >>...
> > >> }
> > >>
> > >>
> > >> And if you iterated over all instances:
> > >>
> > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > >> final Collection kafkaStreamsMetadatas =
> > >> kafkaStreams.allInstances();
> > >> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> > >>if (streamsMetadata.stateStoreNames().contains("word-count")) {
> > >>http.get("http://; + streamsMetadata.host() + ":" +
> > >> 

[jira] [Commented] (KAFKA-3929) Add prefix for underlying clients configs in StreamConfig

2016-07-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375381#comment-15375381
 ] 

Guozhang Wang commented on KAFKA-3929:
--

I am sorry for the late reply, missed your response in my email notifier..

Yeah that is the purpose. In the first step of KAFKA-3740 [~h...@pinterest.com] 
already add the function to expose all config properties in key-value pairs 
with sth. like {{ProcessorContext.appConfigsWithPrefix}}. And in this ticket we 
just want make a standard set of prefixes that the library is aware of, and 
also the users should obey when specifying the configs before constructing the 
instance.

> Add prefix for underlying clients configs in StreamConfig
> -
>
> Key: KAFKA-3929
> URL: https://issues.apache.org/jira/browse/KAFKA-3929
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Ishita Mandhan
>  Labels: api
>
> There are a couple of configs that have the same name for producer / consumer 
> configs, e.g. take a look at {{CommonClientConfigs}}, and also for producer / 
> consumer interceptors there are commonly named configs as well.
> This is semi-related to KAFKA-3740 since we need to add "sub-class" configs 
> for RocksDB as well, and we'd better have some prefix mechanism for such 
> hierarchical configs in general.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3961) broker sends malformed response when switching from no compression to snappy/gzip

2016-07-13 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375342#comment-15375342
 ] 

Manikumar Reddy commented on KAFKA-3961:


you can try DumpLogSegments tools to to verify messages from data files.  This 
will give compression type for each message.
https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-DumpLogSegment

> broker sends malformed response when switching from no compression to 
> snappy/gzip
> -
>
> Key: KAFKA-3961
> URL: https://issues.apache.org/jira/browse/KAFKA-3961
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
> Environment: docker container java:openjdk-8-jre on arch linux 
> 4.5.4-1-ARCH
>Reporter: Dieter Plaetinck
>
> Hi this is my first time using this tracker, so please bear with me (priority 
> seems to be major by default?)
> I should be allowed to switch back and forth between none/gzip/snappy 
> compression to the same topic/partition, right?
> (I couldn't find this explicitly anywhere but seems implied through the docs 
> and also from https://issues.apache.org/jira/browse/KAFKA-1499)
> when I try this, first i use no compression, than kill my producer, restart 
> it with snappy or gzip compression, send data to the same topic/partition 
> again, it seems the broker is sending a malformed response to my consumer.  
> At least that's what was suggested when i was reporting this problem in the 
> tracker for the client library I use 
> (https://github.com/Shopify/sarama/issues/698). Also noteworthy is that the 
> broker doesn't log anything when this happens.
> thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3940) Log should check the return value of dir.mkdirs()

2016-07-13 Thread Jim Jagielski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375225#comment-15375225
 ] 

Jim Jagielski commented on KAFKA-3940:
--

Pull request created

> Log should check the return value of dir.mkdirs()
> -
>
> Key: KAFKA-3940
> URL: https://issues.apache.org/jira/browse/KAFKA-3940
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Ishita Mandhan
>  Labels: newbie
>
> In Log.loadSegments(), we call dir.mkdirs() w/o checking the return value and 
> just assume the directory will exist after the call. However, if the 
> directory can't be created (e.g. due to no space), we will hit 
> NullPointerException in the next statement, which will be confusing.
>for(file <- dir.listFiles if file.isFile) {



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3940) Log should check the return value of dir.mkdirs()

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375222#comment-15375222
 ] 

ASF GitHub Bot commented on KAFKA-3940:
---

GitHub user jimjag opened a pull request:

https://github.com/apache/kafka/pull/1620

KAFKA-3940: Log should check the return value of dir.mkdirs()



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jimjag/kafka KAFKA-3940

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1620.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1620


commit b9416916c8dfbcfd623b625b5835fc140c904ac5
Author: Jim Jagielski 
Date:   2016-07-13T15:36:46Z

KAFKA-3940: Log should check the return value of dir.mkdirs()




> Log should check the return value of dir.mkdirs()
> -
>
> Key: KAFKA-3940
> URL: https://issues.apache.org/jira/browse/KAFKA-3940
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Ishita Mandhan
>  Labels: newbie
>
> In Log.loadSegments(), we call dir.mkdirs() w/o checking the return value and 
> just assume the directory will exist after the call. However, if the 
> directory can't be created (e.g. due to no space), we will hit 
> NullPointerException in the next statement, which will be confusing.
>for(file <- dir.listFiles if file.isFile) {



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1620: KAFKA-3940: Log should check the return value of d...

2016-07-13 Thread jimjag
GitHub user jimjag opened a pull request:

https://github.com/apache/kafka/pull/1620

KAFKA-3940: Log should check the return value of dir.mkdirs()



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jimjag/kafka KAFKA-3940

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1620.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1620


commit b9416916c8dfbcfd623b625b5835fc140c904ac5
Author: Jim Jagielski 
Date:   2016-07-13T15:36:46Z

KAFKA-3940: Log should check the return value of dir.mkdirs()




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-3961) broker sends malformed response when switching from no compression to snappy/gzip

2016-07-13 Thread Dieter Plaetinck (JIRA)
Dieter Plaetinck created KAFKA-3961:
---

 Summary: broker sends malformed response when switching from no 
compression to snappy/gzip
 Key: KAFKA-3961
 URL: https://issues.apache.org/jira/browse/KAFKA-3961
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.0
 Environment: docker container java:openjdk-8-jre on arch linux 
4.5.4-1-ARCH

Reporter: Dieter Plaetinck


Hi this is my first time using this tracker, so please bear with me (priority 
seems to be major by default?)

I should be allowed to switch back and forth between none/gzip/snappy 
compression to the same topic/partition, right?
(I couldn't find this explicitly anywhere but seems implied through the docs 
and also from https://issues.apache.org/jira/browse/KAFKA-1499)

when I try this, first i use no compression, than kill my producer, restart it 
with snappy or gzip compression, send data to the same topic/partition again, 
it seems the broker is sending a malformed response to my consumer.  At least 
that's what was suggested when i was reporting this problem in the tracker for 
the client library I use (https://github.com/Shopify/sarama/issues/698). Also 
noteworthy is that the broker doesn't log anything when this happens.

thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3858) Add functions to print stream topologies

2016-07-13 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3858 started by Eno Thereska.
---
> Add functions to print stream topologies
> 
>
> Key: KAFKA-3858
> URL: https://issues.apache.org/jira/browse/KAFKA-3858
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Roger Hoover
>Assignee: Eno Thereska
>
> For debugging and development, it would be very useful to be able to print 
> Kafka streams topologies.  At a minimum, it would be great to be able to see 
> the logical topology including with Kafka topics linked by sub-topologies.  I 
> think that this information does not depend on partitioning.  For more 
> detail, it would be great to be able to print the same logical topology but 
> also showing number of tasks (an perhaps task ids?).  Finally, it would be 
> great to show the physical topology after the tasks have been mapped to JVMs 
> + threads.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Neha Narkhede
Damian -- appreciate the example code and you convinced me. Agree that the
class approach is better and renaming to KafkaStreamsMetadata along with
renaming the API methods will address the issues I was referring to.

One other thing I wanted to get people's thoughts on was the way we are
proposing to handle different store types. I am sure you guys have thought
about the tradeoffs of using the store wrappers and matchers (
QueryableStoreType) vs just making users cast the returned store to the
type they would expect to use. That is simple but the obvious downside is
that it is likely to result in exceptions for users that don't know what
they are doing.

In my experience of dealing with apps that would use queriable state, it
appears to me that a majority would just use the key value store. Partly
because that will suffice and partly because people might just follow the
simpler examples we provide that use key-value store. For advanced users,
they will be aware of the reason they want to use the windowed store and
will know how to cast it. The advantage of the current approach is that it
is likely more robust and general but involves introduces more interfaces
and wrapper code.

I tend to prefer simplicity to optimize for the general case, but curious
to get people's thoughts on this as well.

On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski  wrote:

> IMO, that makes the most sense.
>
> > On Jul 12, 2016, at 5:11 PM, Ismael Juma  wrote:
> >
> > Hi Damian,
> >
> > How about StreamsMetadata instead? The general naming pattern seems to
> > avoid the `Kafka` prefix for everything outside of `KafkaStreams` itself.
> >
> > Ismael
> >
> > On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy 
> wrote:
> >
> >> Hi,
> >>
> >> I agree with point 1. application.server is a better name for the config
> >> (we'll change this). However, on point 2 I think we should stick mostly
> >> with what we already have. I've tried both ways of doing this when
> working
> >> on the JIRA and building examples and I find the current approach more
> >> intuitive and easier to use than the Map based approach.
> >> However, there is probably a naming issue. We should rename
> >> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very simple,
> >> but provides all the information a developer needs to be able to find
> the
> >> instance(s) of a Streams application that a particular store is running
> on,
> >> i.e.,
> >>
> >> public class KafkStreamsMetadata {
> >>private final HostInfo hostInfo;
> >>private final Set stateStoreNames;
> >>private final Set topicPartitions;
> >>
> >>
> >> So using the API to route to a new host is fairly simple, particularly
> in
> >> the case when you want to find the host for a particular key, i.e.,
> >>
> >> final KafkaStreams kafkaStreams = createKafkaStreams();
> >> final KafkaStreamsMetadata streamsMetadata =
> >> kafkaStreams.instanceWithKey("word-count", "hello",
> >> Serdes.String().serializer());
> >> http.get("http://; + streamsMetadata.host() + ":" +
> >> streamsMetadata.port() + "/get/word-count/hello");
> >>
> >>
> >> And if you want to do a scatter gather approach:
> >>
> >> final KafkaStreams kafkaStreams = createKafkaStreams();
> >> final Collection kafkaStreamsMetadatas =
> >> kafkaStreams.allInstancesWithStore("word-count");
> >> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> >>http.get("http://; + streamsMetadata.host() + ":" +
> >> streamsMetadata.port() + "/get/word-count/hello");
> >>...
> >> }
> >>
> >>
> >> And if you iterated over all instances:
> >>
> >> final KafkaStreams kafkaStreams = createKafkaStreams();
> >> final Collection kafkaStreamsMetadatas =
> >> kafkaStreams.allInstances();
> >> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
> >>if (streamsMetadata.stateStoreNames().contains("word-count")) {
> >>http.get("http://; + streamsMetadata.host() + ":" +
> >> streamsMetadata.port() + "/get/word-count/hello");
> >>...
> >>}
> >> }
> >>
> >>
> >> If we were to change this to use Map for
> the
> >> most part users would need to iterate over the entry or key set.
> Examples:
> >>
> >> The finding an instance by key is a little odd:
> >>
> >> final KafkaStreams kafkaStreams = createKafkaStreams();
> >> final Map streamsMetadata =
> >> kafkaStreams.instanceWithKey("word-count","hello",
> >> Serdes.String().serializer());
> >> // this is a bit odd as i only expect one:
> >> for (HostInfo hostInfo : streamsMetadata.keySet()) {
> >>http.get("http://; + streamsMetadata.host() + ":" +
> >> streamsMetadata.port() + "/get/word-count/hello");
> >> }
> >>
> >>
> >> The scatter/gather by store is fairly similar to the previous example:
> >>
> >> final KafkaStreams kafkaStreams = createKafkaStreams();
> >> final Map streamsMetadata =
> >> kafkaStreams.allInstancesWithStore("word-count");
> >> 

Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-13 Thread Jim Jagielski
IMO, that makes the most sense.

> On Jul 12, 2016, at 5:11 PM, Ismael Juma  wrote:
> 
> Hi Damian,
> 
> How about StreamsMetadata instead? The general naming pattern seems to
> avoid the `Kafka` prefix for everything outside of `KafkaStreams` itself.
> 
> Ismael
> 
> On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy  wrote:
> 
>> Hi,
>> 
>> I agree with point 1. application.server is a better name for the config
>> (we'll change this). However, on point 2 I think we should stick mostly
>> with what we already have. I've tried both ways of doing this when working
>> on the JIRA and building examples and I find the current approach more
>> intuitive and easier to use than the Map based approach.
>> However, there is probably a naming issue. We should rename
>> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very simple,
>> but provides all the information a developer needs to be able to find the
>> instance(s) of a Streams application that a particular store is running on,
>> i.e.,
>> 
>> public class KafkStreamsMetadata {
>>private final HostInfo hostInfo;
>>private final Set stateStoreNames;
>>private final Set topicPartitions;
>> 
>> 
>> So using the API to route to a new host is fairly simple, particularly in
>> the case when you want to find the host for a particular key, i.e.,
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final KafkaStreamsMetadata streamsMetadata =
>> kafkaStreams.instanceWithKey("word-count", "hello",
>> Serdes.String().serializer());
>> http.get("http://; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>> 
>> 
>> And if you want to do a scatter gather approach:
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final Collection kafkaStreamsMetadatas =
>> kafkaStreams.allInstancesWithStore("word-count");
>> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
>>http.get("http://; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>>...
>> }
>> 
>> 
>> And if you iterated over all instances:
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final Collection kafkaStreamsMetadatas =
>> kafkaStreams.allInstances();
>> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
>>if (streamsMetadata.stateStoreNames().contains("word-count")) {
>>http.get("http://; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>>...
>>}
>> }
>> 
>> 
>> If we were to change this to use Map for the
>> most part users would need to iterate over the entry or key set. Examples:
>> 
>> The finding an instance by key is a little odd:
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final Map streamsMetadata =
>> kafkaStreams.instanceWithKey("word-count","hello",
>> Serdes.String().serializer());
>> // this is a bit odd as i only expect one:
>> for (HostInfo hostInfo : streamsMetadata.keySet()) {
>>http.get("http://; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>> }
>> 
>> 
>> The scatter/gather by store is fairly similar to the previous example:
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final Map streamsMetadata =
>> kafkaStreams.allInstancesWithStore("word-count");
>> for(HostInfo hostInfo : streamsMetadata.keySet()) {
>>http.get("http://; + hostInfo.host() + ":" + hostInfo.port() +
>> "/get/word-count/hello");
>>...
>> }
>> 
>> And iterating over all instances:
>> 
>> final Map streamsMetadata =
>> kafkaStreams.allInstances();
>> for (Map.Entry entry :
>> streamsMetadata.entrySet()) {
>>for (TaskMetadata taskMetadata : entry.getValue()) {
>>if (taskMetadata.stateStoreNames().contains("word-count")) {
>>http.get("http://; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>>...
>>}
>>}
>> }
>> 
>> 
>> IMO - having a class we return is the better approach as it nicely wraps
>> the related things, i.e, host:port, store names, topic partitions into an
>> Object that is easy to use. Further we could add some behaviour to this
>> class if we felt it necessary, i.e, hasStore(storeName) etc.
>> 
>> Anyway, i'm interested in your thoughts.
>> 
>> Thanks,
>> Damian
>> 
>> On Mon, 11 Jul 2016 at 13:47 Guozhang Wang  wrote:
>> 
>>> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
>>> 
>>> I agree with Neha that Kafka Streams can provide the bare minimum APIs
>> just
>>> for host/port, and user's implemented layer can provide URL / proxy
>> address
>>> they want to build on top of it.
>>> 
>>> 
>>> 2. Re Improving KafkaStreamsInstance interface:
>>> 
>>> Users are indeed aware of "TaskId" class which is not part of internal
>>> packages and is exposed in PartitionGrouper interface 

[jira] [Commented] (KAFKA-3858) Add functions to print stream topologies

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375140#comment-15375140
 ] 

ASF GitHub Bot commented on KAFKA-3858:
---

GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/1619

KAFKA-3858: Basic printing of everything



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka KAFKA-3858-print-topology

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1619.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1619


commit 4152616717118ca585d8f0707fb91b9795dd1efd
Author: Eno Thereska 
Date:   2016-07-13T14:45:58Z

Basic printing of everything




> Add functions to print stream topologies
> 
>
> Key: KAFKA-3858
> URL: https://issues.apache.org/jira/browse/KAFKA-3858
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Roger Hoover
>Assignee: Eno Thereska
>
> For debugging and development, it would be very useful to be able to print 
> Kafka streams topologies.  At a minimum, it would be great to be able to see 
> the logical topology including with Kafka topics linked by sub-topologies.  I 
> think that this information does not depend on partitioning.  For more 
> detail, it would be great to be able to print the same logical topology but 
> also showing number of tasks (an perhaps task ids?).  Finally, it would be 
> great to show the physical topology after the tasks have been mapped to JVMs 
> + threads.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1619: KAFKA-3858: Basic printing of everything

2016-07-13 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/1619

KAFKA-3858: Basic printing of everything



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka KAFKA-3858-print-topology

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1619.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1619


commit 4152616717118ca585d8f0707fb91b9795dd1efd
Author: Eno Thereska 
Date:   2016-07-13T14:45:58Z

Basic printing of everything




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2205) Generalize TopicConfigManager to handle multiple entity configs

2016-07-13 Thread Scott Kruger (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375073#comment-15375073
 ] 

Scott Kruger commented on KAFKA-2205:
-

Looks like this isn't *quite* fixed all the way.  
{{DynamicConfigManager.processNotification}} still reports that {{entity_type}} 
should be {{topic}} or {{client}}, not the correct pluralized forms.  I ran 
into this when manually updating configs directly in zookeeper.

> Generalize TopicConfigManager to handle multiple entity configs
> ---
>
> Key: KAFKA-2205
> URL: https://issues.apache.org/jira/browse/KAFKA-2205
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Aditya Auradkar
>Assignee: Aditya Auradkar
>  Labels: quotas
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2205.patch, KAFKA-2205_2015-07-01_18:38:18.patch, 
> KAFKA-2205_2015-07-07_19:12:15.patch, KAFKA-2205_2015-07-14_10:33:47.patch, 
> KAFKA-2205_2015-07-14_10:36:36.patch, KAFKA-2205_2015-07-17_11:14:26.patch, 
> KAFKA-2205_2015-07-17_11:18:31.patch, KAFKA-2205_2015-07-24_18:11:34.patch
>
>
> Acceptance Criteria:
> - TopicConfigManager should be generalized to handle Topic and Client configs 
> (and any type of config in the future). As described in KIP-21
> - Add a ConfigCommand tool to change topic and client configuration



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3887) StreamBounceTest.test_bounce and StreamSmokeTest.test_streams failing

2016-07-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3887.
--
Resolution: Fixed

Marking as fixed per 1611.

> StreamBounceTest.test_bounce and StreamSmokeTest.test_streams failing
> -
>
> Key: KAFKA-3887
> URL: https://issues.apache.org/jira/browse/KAFKA-3887
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Ismael Juma
>Assignee: Guozhang Wang
>  Labels: transient-system-test-failure
> Fix For: 0.10.0.1
>
>
> StreamBounceTest.test_bounce and StreamSmokeTest.test_streams has been 
> failing semi-regularly. Output from the latest failure:
> {code}
> 
> test_id:
> 2016-06-20--001.kafkatest.tests.streams.streams_bounce_test.StreamsBounceTest.test_bounce
> status: FAIL
> run time:   4 minutes 13.916 seconds
> Streams Smoke Test process on ubuntu@worker5 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/streams/streams_bounce_test.py",
>  line 67, in test_bounce
> self.driver.wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 94, in wait
> wait_until(lambda: not node.account.alive(pid), timeout_sec=120, 
> err_msg="Streams Smoke Test process on " + str(node.account) + " took too 
> long to exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Streams Smoke Test process on ubuntu@worker5 took too long to 
> exit
> 
> test_id:
> 2016-06-20--001.kafkatest.tests.streams.streams_smoke_test.StreamsSmokeTest.test_streams
> status: FAIL
> run time:   4 minutes 36.426 seconds
> Streams Smoke Test process on ubuntu@worker9 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/streams/streams_smoke_test.py",
>  line 67, in test_streams
> self.driver.wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 94, in wait
> wait_until(lambda: not node.account.alive(pid), timeout_sec=120, 
> err_msg="Streams Smoke Test process on " + str(node.account) + " took too 
> long to exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Streams Smoke Test process on ubuntu@worker9 took too long to 
> exit
> {code}
> https://jenkins.confluent.io/job/system-test-kafka/255/console



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3960) Committed offset not set after first assign

2016-07-13 Thread Alexey Romanchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15374813#comment-15374813
 ] 

Alexey Romanchuk commented on KAFKA-3960:
-

I can not find any workaround for this bug. If you know it, please share it in 
comments

> Committed offset not set after first assign
> ---
>
> Key: KAFKA-3960
> URL: https://issues.apache.org/jira/browse/KAFKA-3960
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Alexey Romanchuk
>Priority: Blocker
>
> Committed offset did not set after first assign. Here it is minimal example 
> (scala):
> {code}
>   val props = new Properties()
>   props.put("bootstrap.servers", "localhost:9092")
>   props.put("client.id", "client1")
>   props.put("group.id", "client1")
>   props.put("enable.auto.commit", "false")
>   props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>   props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
>   import scala.collection.JavaConversions._
>   def dumpPositionAndCommitted() = {
> consumer.assignment().foreach { tp =>
>   println(tp)
>   println(s"Position - ${consumer.position(tp)}")
>   println(s"Committed - ${consumer.committed(tp)}")
> }
> println("---")
>   }
>   consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
>   dumpPositionAndCommitted()
>   Thread.sleep(3000)
>   val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ 
> consumer.assignment()
>   consumer.assign(ps)
>   dumpPositionAndCommitted()
>   Thread.sleep(3000)
>   dumpPositionAndCommitted()
> {code}
> and the result is
> {noformat}
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> ---
> proto7_fraud-1
> Position - 1262864347
> Committed - null
> topic-0
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> ---
> topic-1
> Position - 1262864347
> Committed - null
> topic-0
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> ---
> {noformat}
> Pay attention to 
> {noformat}
> topic-1
> Position - 1262864347
> Committed - null
> {noformat}
> There is no committed offset fetched from broker, but it is. Looks like we 
> should set {{needsFetchCommittedOffsets}} to {{true}} during assign the 
> partition



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3960) Committed offset not set after first assign

2016-07-13 Thread Alexey Romanchuk (JIRA)
Alexey Romanchuk created KAFKA-3960:
---

 Summary: Committed offset not set after first assign
 Key: KAFKA-3960
 URL: https://issues.apache.org/jira/browse/KAFKA-3960
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.0.0
Reporter: Alexey Romanchuk
Priority: Blocker


Committed offset did not set after first assign. Here it is minimal example 
(scala):

{code}
  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("client.id", "client1")
  props.put("group.id", "client1")
  props.put("enable.auto.commit", "false")
  props.put("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
  props.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")

  val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)

  import scala.collection.JavaConversions._
  def dumpPositionAndCommitted() = {
consumer.assignment().foreach { tp =>
  println(tp)
  println(s"Position - ${consumer.position(tp)}")
  println(s"Committed - ${consumer.committed(tp)}")
}
println("---")
  }

  consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
  dumpPositionAndCommitted()
  Thread.sleep(3000)
  val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ 
consumer.assignment()
  consumer.assign(ps)
  dumpPositionAndCommitted()
  Thread.sleep(3000)
  dumpPositionAndCommitted()
{code}

and the result is

{noformat}
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
proto7_fraud-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
topic-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
{noformat}

Pay attention to 
{noformat}
topic-1
Position - 1262864347
Committed - null
{noformat}

There is no committed offset fetched from broker, but it is. Looks like we 
should set {{needsFetchCommittedOffsets}} to {{true}} during assign the 
partition



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-07-13 Thread Alban Hurtaud (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alban Hurtaud updated KAFKA-3959:
-
Description: 
When creating a stack of 3 kafka brokers, the consumer is starting faster than 
kafka nodes and when trying to read a topic, only one kafka node is available.
So the __consumer_offsets is created with a replication factor set to 1 
(instead of configured 3) :

offsets.topic.replication.factor=3
default.replication.factor=3
min.insync.replicas=2

Then, other kafka nodes go up and we have exceptions because the replicas # for 
__consumer_offsets is 1 and min insync is 2. So exceptions are thrown.

What I missed is : Why the __consumer_offsets is created with replication to 1 
(when 1 broker is running) whereas in server.properties it is set to 3 ?

To reproduce : 
- Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
- Run one kafka,
- Run one consumer (the __consumer_offsets is created with replicas =1)
- Run 2 more kafka nodes

  was:
When creating a stack of 3 kafka brokers, the consumer is starting faster than 
kafka nodes and when trying to read a topic, only one kafka node is available.
So the __consumer_offsets is created with a replication factor set to 1 
(instead of configured 3) :

offsets.topic.replication.factor=3
default.replication.factor=3
min.insync.replicas=2

Then, other kafka nodes go up and we have exceptions because the replicas # for 
__consumer_offsets is 1 and min insync is 2. So exceptions are thrown.

What I missed is : Why the __consumer_offsets is topic is created with 
replication to 1 (when 1 broker is running) whereas in server.properties it is 
set to 3 ?

To reproduce : 
- Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
- Run one kafka,
- Run one consumer (the __consumer_offsets is created with replicas =1)
- Run 2 more kafka nodes


> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-07-13 Thread Alban Hurtaud (JIRA)
Alban Hurtaud created KAFKA-3959:


 Summary: __consumer_offsets wrong number of replicas at startup
 Key: KAFKA-3959
 URL: https://issues.apache.org/jira/browse/KAFKA-3959
 Project: Kafka
  Issue Type: Bug
  Components: consumer, offset manager, replication
Affects Versions: 0.10.0.0, 0.9.0.1
 Environment: Brokers of 3 kafka nodes running Red Hat Enterprise Linux 
Server release 7.2 (Maipo)
Reporter: Alban Hurtaud


When creating a stack of 3 kafka brokers, the consumer is starting faster than 
kafka nodes and when trying to read a topic, only one kafka node is available.
So the __consumer_offsets is created with a replication factor set to 1 
(instead of configured 3) :

offsets.topic.replication.factor=3
default.replication.factor=3
min.insync.replicas=2

Then, other kafka nodes go up and we have exceptions because the replicas # for 
__consumer_offsets is 1 and min insync is 2. So exceptions are thrown.

What I missed is : Why the __consumer_offsets is topic is created with 
replication to 1 (when 1 broker is running) whereas in server.properties it is 
set to 3 ?

To reproduce : 
- Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
- Run one kafka,
- Run one consumer (the __consumer_offsets is created with replicas =1)
- Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)