[jira] [Commented] (KAFKA-5608) System test failure due to timeout starting Jmx tool

2017-07-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092578#comment-16092578
 ] 

ASF GitHub Bot commented on KAFKA-5608:
---

GitHub user ewencp opened a pull request:

https://github.com/apache/kafka/pull/3547

KAFKA-5608: Add --wait option for JmxTool and use in system tests to avoid 
race between JmxTool and monitored services



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ewencp/kafka wait-jmx-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3547


commit b33306053eda3ece107965b0db6111f36cc3232e
Author: Ewen Cheslack-Postava 
Date:   2017-07-19T04:31:23Z

KAFKA-5608: Add --wait option for JmxTool and use in system tests to avoid 
race between JmxTool and monitored services




> System test failure due to timeout starting Jmx tool
> 
>
> Key: KAFKA-5608
> URL: https://issues.apache.org/jira/browse/KAFKA-5608
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Ewen Cheslack-Postava
>
> Began seeing this in some failing system tests:
> {code}
> [INFO  - 2017-07-18 14:25:55,375 - background_thread - _protected_worker - 
> lineno:39]: Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/services/background_thread.py",
>  line 35, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/services/console_consumer.py",
>  line 261, in _worker
> self.start_jmx_tool(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/services/monitor/jmx.py",
>  line 73, in start_jmx_tool
> wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, 
> backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: ubuntu@worker7: Jmx tool took too long to start
> {code}
> This is immediately followed by a consumer timeout in the failing cases:
> {code}
> [INFO  - 2017-07-18 14:26:46,907 - runner_client - log - lineno:221]: 
> RunnerClient: 
> kafkatest.tests.core.security_rolling_upgrade_test.TestSecurityRollingUpgrade.test_rolling_upgrade_phase_two.broker_protocol=SASL_SSL.client_protocol=SASL_SSL:
>  FAIL: Consumer failed to consume messages for 60s.
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py",
>  line 148, in test_rolling_upgrade_phase_two
> self.run_produce_consume_validate(self.roll_in_secured_settings, 
> client_protocol, broker_protocol)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 106, in run_produce_consume_validate
> self.start_producer_and_consumer()
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in start_producer_and_consumer
> self.consumer_start_timeout_sec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Consumer failed to consume messages for 60s.
> {code}
> There does not appear to be anything wrong with the consumer in the logs, so 
> the timeout seems to be caused by the Jmx tool timeout.
> Possibly due to https://github.com/apache/kafka/pull/3447/files?



--

[jira] [Commented] (KAFKA-5608) System test failure due to timeout starting Jmx tool

2017-07-18 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092497#comment-16092497
 ] 

Jason Gustafson commented on KAFKA-5608:


[~ewencp] I think there is a race condition when starting up the jmx tool at 
the same time as the console consumer. If the metrics haven't been registered 
at initialization time, we may end up querying for nothing. We should probably 
modify the tool to wait until at least one expected name has been found before 
beginning polling. I did a quick dirty fix locally and it seemed to work. Does 
that sound plausible?

> System test failure due to timeout starting Jmx tool
> 
>
> Key: KAFKA-5608
> URL: https://issues.apache.org/jira/browse/KAFKA-5608
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Ewen Cheslack-Postava
>
> Began seeing this in some failing system tests:
> {code}
> [INFO  - 2017-07-18 14:25:55,375 - background_thread - _protected_worker - 
> lineno:39]: Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/services/background_thread.py",
>  line 35, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/services/console_consumer.py",
>  line 261, in _worker
> self.start_jmx_tool(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/services/monitor/jmx.py",
>  line 73, in start_jmx_tool
> wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, 
> backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: ubuntu@worker7: Jmx tool took too long to start
> {code}
> This is immediately followed by a consumer timeout in the failing cases:
> {code}
> [INFO  - 2017-07-18 14:26:46,907 - runner_client - log - lineno:221]: 
> RunnerClient: 
> kafkatest.tests.core.security_rolling_upgrade_test.TestSecurityRollingUpgrade.test_rolling_upgrade_phase_two.broker_protocol=SASL_SSL.client_protocol=SASL_SSL:
>  FAIL: Consumer failed to consume messages for 60s.
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py",
>  line 148, in test_rolling_upgrade_phase_two
> self.run_produce_consume_validate(self.roll_in_secured_settings, 
> client_protocol, broker_protocol)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 106, in run_produce_consume_validate
> self.start_producer_and_consumer()
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in start_producer_and_consumer
> self.consumer_start_timeout_sec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Consumer failed to consume messages for 60s.
> {code}
> There does not appear to be anything wrong with the consumer in the logs, so 
> the timeout seems to be caused by the Jmx tool timeout.
> Possibly due to https://github.com/apache/kafka/pull/3447/files?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5608) System test failure due to timeout starting Jmx tool

2017-07-18 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092467#comment-16092467
 ] 

Ewen Cheslack-Postava commented on KAFKA-5608:
--

Another interesting point by [~hachikuji] is that all the tests that fail with 
this error seem to use SASL.

> System test failure due to timeout starting Jmx tool
> 
>
> Key: KAFKA-5608
> URL: https://issues.apache.org/jira/browse/KAFKA-5608
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Ewen Cheslack-Postava
>
> Began seeing this in some failing system tests:
> {code}
> [INFO  - 2017-07-18 14:25:55,375 - background_thread - _protected_worker - 
> lineno:39]: Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/services/background_thread.py",
>  line 35, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/services/console_consumer.py",
>  line 261, in _worker
> self.start_jmx_tool(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/services/monitor/jmx.py",
>  line 73, in start_jmx_tool
> wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, 
> backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: ubuntu@worker7: Jmx tool took too long to start
> {code}
> This is immediately followed by a consumer timeout in the failing cases:
> {code}
> [INFO  - 2017-07-18 14:26:46,907 - runner_client - log - lineno:221]: 
> RunnerClient: 
> kafkatest.tests.core.security_rolling_upgrade_test.TestSecurityRollingUpgrade.test_rolling_upgrade_phase_two.broker_protocol=SASL_SSL.client_protocol=SASL_SSL:
>  FAIL: Consumer failed to consume messages for 60s.
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py",
>  line 148, in test_rolling_upgrade_phase_two
> self.run_produce_consume_validate(self.roll_in_secured_settings, 
> client_protocol, broker_protocol)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 106, in run_produce_consume_validate
> self.start_producer_and_consumer()
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in start_producer_and_consumer
> self.consumer_start_timeout_sec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-0.11.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Consumer failed to consume messages for 60s.
> {code}
> There does not appear to be anything wrong with the consumer in the logs, so 
> the timeout seems to be caused by the Jmx tool timeout.
> Possibly due to https://github.com/apache/kafka/pull/3447/files?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5217) Improve Streams internal exception handling

2017-07-18 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-5217:
--

[~enothereska] The parent JIRA is an umbrella story of improving error handling 
in Streams, while this JIRA itself has a defined sub-task scope on the streams 
internal exception handling. Have you made a thorough pass over all the call 
traces to make sure all the issues are already resolved? One example I can tell 
is that, in {{KeyValueStore.put(k, v)}} function we may throw KafkaException or 
StreamsException from the {{StoreChangeLogger}} although we do not document in 
in the javadocs. Though it is debatable that whether we should encourage users 
to ever try-catch all Exceptions in their cals, I'm afraid there are maybe 
other cases that we have not thought about yet.

> Improve Streams internal exception handling
> ---
>
> Key: KAFKA-5217
> URL: https://issues.apache.org/jira/browse/KAFKA-5217
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.11.1.0
>
>
> Streams does not handle all exceptions gracefully atm, but tend to throw 
> exceptions to the user, even if we could handle them internally and recover 
> automatically. We want to revisit this exception handling to be more 
> resilient.
> For example, for any kind of rebalance exception, we should just log it, and 
> rejoin the consumer group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5302) Improve exception handling on streams client (communication with brokers)

2017-07-18 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091788#comment-16091788
 ] 

Eno Thereska commented on KAFKA-5302:
-

Sounds good.

> Improve exception handling on streams client (communication with brokers)
> -
>
> Key: KAFKA-5302
> URL: https://issues.apache.org/jira/browse/KAFKA-5302
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> These are exceptions in StreamsKafkaClient.java.
> Currently throws either StreamsException or BrokerNotFoundException.
> Used by InternalTopicManager to create topics and get their metadata.
> Used by StreamPartitionAssignor. 
> Currently InternalTopicManager retries a few times after catching an 
> exception. 
> A failure here is sent all the way up to the stream thread and will stop the 
> streams pipeline. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5301) Improve exception handling on consumer path

2017-07-18 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091787#comment-16091787
 ] 

Eno Thereska commented on KAFKA-5301:
-

Sure, we can leave open.

> Improve exception handling on consumer path
> ---
>
> Key: KAFKA-5301
> URL: https://issues.apache.org/jira/browse/KAFKA-5301
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> Used in StreamsThread.java, mostly to .poll() but also to restore data.
> Used in StreamsTask.java, mostly to .pause(), .resume()
> All exceptions here are currently caught all the way up to the main running 
> loop in a broad catch(Exception e) statement in StreamThread.run().
> One main concern on the consumer path is handling deserialization errors that 
> happen before streams has even had a chance to look at the data: 
> https://issues.apache.org/jira/browse/KAFKA-5157  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5302) Improve exception handling on streams client (communication with brokers)

2017-07-18 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-5302:
--

[~enothereska] Let's leave this ticket open until we have done the migration; 
also we can use this JIRA as a reference for the current known issues with the 
streams client and make sure these are appropriately handled when we replace 
with the admin client.

> Improve exception handling on streams client (communication with brokers)
> -
>
> Key: KAFKA-5302
> URL: https://issues.apache.org/jira/browse/KAFKA-5302
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> These are exceptions in StreamsKafkaClient.java.
> Currently throws either StreamsException or BrokerNotFoundException.
> Used by InternalTopicManager to create topics and get their metadata.
> Used by StreamPartitionAssignor. 
> Currently InternalTopicManager retries a few times after catching an 
> exception. 
> A failure here is sent all the way up to the stream thread and will stop the 
> streams pipeline. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5301) Improve exception handling on consumer path

2017-07-18 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-5301:
--

[~enothereska] I think we should not resolve this JIRA in a hurry. Have you 
made a thorough pass over the consumer code path and confirmed that {{the rest 
are OK}}?

For example, one obvious pitfall I can observe is the {{rebalanceException}} 
used in the {{StreamThread}}: we throw the exception in the 
{{onPartitionsRevoked}} and {{onPartitionsAssigned}} and at the same time 
remember that exception in this variable, the thrown exception from the 
callback will be swallowed by the {{ConsumerCoordinator}} and logged as an 
error, while we will later on rethrow the exception again. I can see two issues 
here:

1) throw the exception twice, with the first thrown exception only causing a 
error log4j entry is redundant. If we will anyways rethrow the exception after 
the rebalance, we may consider not throwing it anymore inside the callbacks.

2) when we throw the exception in the {{..Revoked}} callback, we are 
effectively leaving the assignor in an unstable state such that the suspended 
tasks / prev tasks etc are not set correctly, however we will still call 
{{..Assigned}} later which may be problematic; should we consider skipping the 
later callback if an exception has already been thrown, or should we cleanup 
the cached maps while throwing the exception?

> Improve exception handling on consumer path
> ---
>
> Key: KAFKA-5301
> URL: https://issues.apache.org/jira/browse/KAFKA-5301
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> Used in StreamsThread.java, mostly to .poll() but also to restore data.
> Used in StreamsTask.java, mostly to .pause(), .resume()
> All exceptions here are currently caught all the way up to the main running 
> loop in a broad catch(Exception e) statement in StreamThread.run().
> One main concern on the consumer path is handling deserialization errors that 
> happen before streams has even had a chance to look at the data: 
> https://issues.apache.org/jira/browse/KAFKA-5157  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-18 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091762#comment-16091762
 ] 

Matthias J. Sax commented on KAFKA-5386:


For internal repartitioning topics, you can always avoid them by doing manual 
repartitioning via {{through("my-custom-topic-name")}}. For changelog topics no 
such workaround exist atm. Hope this helps.

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5384) KIP-162: Enable topic deletion by default

2017-07-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira resolved KAFKA-5384.
-
   Resolution: Fixed
Fix Version/s: (was: 0.12.0.0)
   0.11.1.0

Issue resolved by pull request 3241
[https://github.com/apache/kafka/pull/3241]

> KIP-162: Enable topic deletion by default
> -
>
> Key: KAFKA-5384
> URL: https://issues.apache.org/jira/browse/KAFKA-5384
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Gwen Shapira
> Fix For: 0.11.1.0
>
>
> Change default of delete.topic.enable to true
> Remove delete.topic.enable config from config/server.properties.
> See KIP for details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-162+-+Enable+topic+deletion+by+default



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5158) Options for handling exceptions during processing

2017-07-18 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-5158:

Issue Type: New Feature  (was: Sub-task)
Parent: (was: KAFKA-5156)

> Options for handling exceptions during processing
> -
>
> Key: KAFKA-5158
> URL: https://issues.apache.org/jira/browse/KAFKA-5158
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Eno Thereska
> Fix For: 0.11.1.0
>
>
> Imagine the app-level processing of a (non-corrupted) record fails (e.g. the 
> user attempted to do a RPC to an external system, and this call failed). How 
> can you process such failed records in a scalable way? For example, imagine 
> you need to implement a retry policy such as "retry with exponential 
> backoff". Here, you have the problem that 1. you can't really pause 
> processing a single record because this will pause the processing of the full 
> stream (bottleneck!) and 2. there is no straight-forward way to "sort" failed 
> records based on their "next retry time".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5301) Improve exception handling on consumer path

2017-07-18 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091556#comment-16091556
 ] 

Eno Thereska commented on KAFKA-5301:
-

KAFKA-5157 covers the interesting case, the rest is ok. Closing for now.

> Improve exception handling on consumer path
> ---
>
> Key: KAFKA-5301
> URL: https://issues.apache.org/jira/browse/KAFKA-5301
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> Used in StreamsThread.java, mostly to .poll() but also to restore data.
> Used in StreamsTask.java, mostly to .pause(), .resume()
> All exceptions here are currently caught all the way up to the main running 
> loop in a broad catch(Exception e) statement in StreamThread.run().
> One main concern on the consumer path is handling deserialization errors that 
> happen before streams has even had a chance to look at the data: 
> https://issues.apache.org/jira/browse/KAFKA-5157  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5301) Improve exception handling on consumer path

2017-07-18 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska resolved KAFKA-5301.
-
Resolution: Won't Fix

> Improve exception handling on consumer path
> ---
>
> Key: KAFKA-5301
> URL: https://issues.apache.org/jira/browse/KAFKA-5301
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> Used in StreamsThread.java, mostly to .poll() but also to restore data.
> Used in StreamsTask.java, mostly to .pause(), .resume()
> All exceptions here are currently caught all the way up to the main running 
> loop in a broad catch(Exception e) statement in StreamThread.run().
> One main concern on the consumer path is handling deserialization errors that 
> happen before streams has even had a chance to look at the data: 
> https://issues.apache.org/jira/browse/KAFKA-5157  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5499) Double check how we handle exceptions when commits fail

2017-07-18 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-5499:
---

Assignee: Eno Thereska

> Double check how we handle exceptions when commits fail
> ---
>
> Key: KAFKA-5499
> URL: https://issues.apache.org/jira/browse/KAFKA-5499
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> When a task does a lot of processing in-between calls to poll() it happens 
> that it might miss a rebalance. It can find that out once it tries to 
> commit() since it will get an exception. Double check what is supposed to 
> happen on such an exception, e.g., should the thread fail, or should it 
> continue? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5600) Group loading regression causing stale metadata/offsets cache

2017-07-18 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5600:
---
Summary: Group loading regression causing stale metadata/offsets cache  
(was: GroupMetadataManager doesn't read offsets of segmented logs correctly)

> Group loading regression causing stale metadata/offsets cache
> -
>
> Key: KAFKA-5600
> URL: https://issues.apache.org/jira/browse/KAFKA-5600
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1, 0.11.0.0
> Environment: any
>Reporter: Jan Burkhardt
>Priority: Critical
>  Labels: regression, reliability
> Fix For: 0.11.0.1, 0.11.1.0
>
> Attachments: KafkaErrorConsumer.java, KafkaErrorProducer.java
>
>
> After long investigation we found a Problem in Kafka.
> When a __consumer_offsets partition gets segmented and Kafka is restarted and 
> needs to reload offsets, consumers will start at a wrong position when 
> metadata and offset events are in both segments.
> Reproduction:
> 1.) Start zookeeper and kafka as is from the archive
> {code}
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/zookeeper-server-start.sh 
> config/zookeeper.properties
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/kafka-server-start.sh 
> config/server.properties
> {code}
> 2.) Start [^KafkaErrorProducer.java] which adds 1M log entries to the topic 
> test
> 3.) Start [^KafkaErrorConsumer.java] which starts a consumer, reads 100 
> entries one by one and then closes the consumer. This leads to a 2nd segment 
> in /tmp/kafka-logs/__consumer_offsets-27. This step takes some time (around 
> 5mins). The close of the consumer is needed to have metadata events in the 
> segments too.
> 4.) Stop and restart the Kafka broker
> 5.) Start any consumer on topic test and group testgroup
> {code}
> bin/kafka-console-consumer.sh --from-beginning --bootstrap-server 
> localhost:9092 --topic test --consumer-property group.id=testgroup
> {code}
> Is:
> the consumer starts at the segmentation boundary
> Expected:
> the consumer starts at the end
> The Reason for this behavior is the closing brace of the while loop in 
> GroupMetadataManager#loadGroupsAndOffsets at a wrong position introduced with 
> commit 
> https://github.com/apache/kafka/commit/5bd06f1d542e6b588a1d402d059bc24690017d32
> I will prepare a pull request.
> *Edit*: The issue can happen if there are multiple reads from the same 
> segment, see https://github.com/apache/kafka/pull/3538#discussion_r127759694



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5600) GroupMetadataManager doesn't read offsets of segmented logs correctly

2017-07-18 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5600:
---
Description: 
After long investigation we found a Problem in Kafka.
When a __consumer_offsets partition gets segmented and Kafka is restarted and 
needs to reload offsets, consumers will start at a wrong position when metadata 
and offset events are in both segments.

Reproduction:
1.) Start zookeeper and kafka as is from the archive
{code}
KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/zookeeper-server-start.sh 
config/zookeeper.properties
KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/kafka-server-start.sh 
config/server.properties
{code}
2.) Start [^KafkaErrorProducer.java] which adds 1M log entries to the topic test
3.) Start [^KafkaErrorConsumer.java] which starts a consumer, reads 100 entries 
one by one and then closes the consumer. This leads to a 2nd segment in 
/tmp/kafka-logs/__consumer_offsets-27. This step takes some time (around 
5mins). The close of the consumer is needed to have metadata events in the 
segments too.
4.) Stop and restart the Kafka broker
5.) Start any consumer on topic test and group testgroup
{code}
bin/kafka-console-consumer.sh --from-beginning --bootstrap-server 
localhost:9092 --topic test --consumer-property group.id=testgroup
{code}

Is:
the consumer starts at the segmentation boundary
Expected:
the consumer starts at the end

The Reason for this behavior is the closing brace of the while loop in 
GroupMetadataManager#loadGroupsAndOffsets at a wrong position introduced with 
commit 
https://github.com/apache/kafka/commit/5bd06f1d542e6b588a1d402d059bc24690017d32
I will prepare a pull request.

*Edit*: The issue can happen if there are multiple reads from the same segment, 
see https://github.com/apache/kafka/pull/3538#discussion_r127759694

  was:
After long investigation we found a Problem in Kafka.
When a __consumer_offsets partition gets segmented and Kafka is restarted and 
needs to reload offsets, consumers will start at a wrong position when metadata 
and offset events are in both segments.

Reproduction:
1.) Start zookeeper and kafka as is from the archive
{code}
KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/zookeeper-server-start.sh 
config/zookeeper.properties
KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/kafka-server-start.sh 
config/server.properties
{code}
2.) Start [^KafkaErrorProducer.java] which adds 1M log entries to the topic test
3.) Start [^KafkaErrorConsumer.java] which starts a consumer, reads 100 entries 
one by one and then closes the consumer. This leads to a 2nd segment in 
/tmp/kafka-logs/__consumer_offsets-27. This step takes some time (around 
5mins). The close of the consumer is needed to have metadata events in the 
segments too.
4.) Stop and restart the Kafka broker
5.) Start any consumer on topic test and group testgroup
{code}
bin/kafka-console-consumer.sh --from-beginning --bootstrap-server 
localhost:9092 --topic test --consumer-property group.id=testgroup
{code}

Is:
the consumer starts at the segmentation boundary
Expected:
the consumer starts at the end

The Reason for this behavior is the closing brace of the while loop in 
GroupMetadataManager#loadGroupsAndOffsets at a wrong position introduced with 
commit 
https://github.com/apache/kafka/commit/5bd06f1d542e6b588a1d402d059bc24690017d32
I will prepare a pull request.

**Edit**: The issue can happen if there are multiple reads from the same 
segment, see https://github.com/apache/kafka/pull/3538#discussion_r127759694


> GroupMetadataManager doesn't read offsets of segmented logs correctly
> -
>
> Key: KAFKA-5600
> URL: https://issues.apache.org/jira/browse/KAFKA-5600
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1, 0.11.0.0
> Environment: any
>Reporter: Jan Burkhardt
>Priority: Critical
>  Labels: regression, reliability
> Fix For: 0.11.0.1, 0.11.1.0
>
> Attachments: KafkaErrorConsumer.java, KafkaErrorProducer.java
>
>
> After long investigation we found a Problem in Kafka.
> When a __consumer_offsets partition gets segmented and Kafka is restarted and 
> needs to reload offsets, consumers will start at a wrong position when 
> metadata and offset events are in both segments.
> Reproduction:
> 1.) Start zookeeper and kafka as is from the archive
> {code}
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/zookeeper-server-start.sh 
> config/zookeeper.properties
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/kafka-server-start.sh 
> config/server.properties
> {code}
> 2.) Start [^KafkaErrorProducer.java] which adds 1M log entries to the topic 
> test
> 3.) Start [^KafkaErrorConsumer.java] which starts a consumer, reads 100 
> entries one by one and then closes the consumer. This leads to a 2nd segment 

[jira] [Updated] (KAFKA-5600) Group loading regression causing stale metadata/offsets cache

2017-07-18 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5600:
---
Fix Version/s: 0.11.1.0

> Group loading regression causing stale metadata/offsets cache
> -
>
> Key: KAFKA-5600
> URL: https://issues.apache.org/jira/browse/KAFKA-5600
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1, 0.11.0.0
> Environment: any
>Reporter: Jan Burkhardt
>Priority: Critical
>  Labels: regression, reliability
> Fix For: 0.11.0.1, 0.11.1.0
>
> Attachments: KafkaErrorConsumer.java, KafkaErrorProducer.java
>
>
> After long investigation we found a Problem in Kafka.
> When a __consumer_offsets partition gets segmented and Kafka is restarted and 
> needs to reload offsets, consumers will start at a wrong position when 
> metadata and offset events are in both segments.
> Reproduction:
> 1.) Start zookeeper and kafka as is from the archive
> {code}
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/zookeeper-server-start.sh 
> config/zookeeper.properties
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/kafka-server-start.sh 
> config/server.properties
> {code}
> 2.) Start [^KafkaErrorProducer.java] which adds 1M log entries to the topic 
> test
> 3.) Start [^KafkaErrorConsumer.java] which starts a consumer, reads 100 
> entries one by one and then closes the consumer. This leads to a 2nd segment 
> in /tmp/kafka-logs/__consumer_offsets-27. This step takes some time (around 
> 5mins). The close of the consumer is needed to have metadata events in the 
> segments too.
> 4.) Stop and restart the Kafka broker
> 5.) Start any consumer on topic test and group testgroup
> {code}
> bin/kafka-console-consumer.sh --from-beginning --bootstrap-server 
> localhost:9092 --topic test --consumer-property group.id=testgroup
> {code}
> Is:
> the consumer starts at the segmentation boundary
> Expected:
> the consumer starts at the end
> The Reason for this behavior is the closing brace of the while loop in 
> GroupMetadataManager#loadGroupsAndOffsets at a wrong position introduced with 
> commit 
> https://github.com/apache/kafka/commit/5bd06f1d542e6b588a1d402d059bc24690017d32
> I will prepare a pull request.
> *Edit*: The issue can happen if there are multiple reads from the same 
> segment, see https://github.com/apache/kafka/pull/3538#discussion_r127759694



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed

2017-07-18 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5556:
---
Fix Version/s: 0.11.1.0

> KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve 
> exception from future which hasn't failed
> 
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Umesh Chaudhary
>Priority: Critical
> Fix For: 0.10.2.2, 0.11.0.1, 0.11.1.0
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5470) Replace -XX:+DisableExplicitGC with -XX:+ExplicitGCInvokesConcurrent in kafka-run-class

2017-07-18 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5470:
---
Fix Version/s: (was: 0.11.0.1)
   0.11.1.0

> Replace -XX:+DisableExplicitGC with -XX:+ExplicitGCInvokesConcurrent in 
> kafka-run-class
> ---
>
> Key: KAFKA-5470
> URL: https://issues.apache.org/jira/browse/KAFKA-5470
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Critical
> Fix For: 0.11.1.0
>
>
> This is important because Bits.reserveMemory calls System.gc() hoping to free 
> native memory in order to avoid throwing an OutOfMemoryException. This call 
> is currently a no-op due to -XX:+DisableExplicitGC.
> It's worth mentioning that -XX:MaxDirectMemorySize can be used to increase 
> the amount of native memory available for allocation of direct byte buffers.
> Hongyuan Li pointed out the issue with the usage of -XX:+DisableExplicitGC 
> in: 
> https://issues.apache.org/jira/browse/KAFKA-5444?focusedCommentId=16054129=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16054129



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5276) Support derived and prefixed configs in DescribeConfigs (KIP-133)

2017-07-18 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5276:
---
Fix Version/s: (was: 0.11.0.1)
   0.11.1.0

> Support derived and prefixed configs in DescribeConfigs (KIP-133)
> -
>
> Key: KAFKA-5276
> URL: https://issues.apache.org/jira/browse/KAFKA-5276
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.11.1.0
>
>
> The broker supports config overrides per listener. The way we do that is by 
> prefixing the configs with the listener name. These configs are not defined 
> by ConfigDef and they don't appear in `values()`. They do appear in 
> `originals()`. We should change the code so that we return these configs. 
> Because these configs are read-only, nothing needs to be done for 
> AlterConfigs.
> With regards to derived configs, an example is advertised.listeners, which 
> falls back to listeners. This is currently done outside AbstractConfig. We 
> should look into including these into AbstractConfig so that the fallback 
> happens for the returned configs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-07-18 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4669:
---
Fix Version/s: 0.11.1.0

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1, 0.11.1.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-07-18 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4669.

Resolution: Fixed
  Assignee: Rajini Sivaram

Closing this for now. If it reoccurs, let's reopen.

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-18 Thread Bart Vercammen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091463#comment-16091463
 ] 

Bart Vercammen edited comment on KAFKA-5386 at 7/18/17 12:01 PM:
-

Actually this is going to be a little harder than initially expected, as the 
KStream API can create a bunch of _repartition_ topics under the hood, that I 
also wish to comply to a certain known naming convention.

Basically what I'm trying to achieve is that every kafka-topic that is going to 
be created is known up front, and that it complies to a certain naming 
strategy.  It would be nice if this naming strategy could be configured somehow.

Any suggestions on this?



was (Author: cloutrix):
Actually this is going to be a little harder than initially expected, as the 
KStreams API can create a bunch of _repartition_ topics under the hood, that I 
also wish to comply to a certain known naming convention.

Basically what I'm trying to achieve is that every kafka-topic that is going to 
be created is known up front, and that it complies to a certain naming 
strategy.  It would be nice if this naming strategy could be configured somehow.

Any suggestions on this?


> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-18 Thread Bart Vercammen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091463#comment-16091463
 ] 

Bart Vercammen commented on KAFKA-5386:
---

Actually this is going to be a little harder than initially expected, as the 
KStreams API can create a bunch of _repartition_ topics under the hood, that I 
also wish to comply to a certain known naming convention.

Basically what I'm trying to achieve is that every kafka-topic that is going to 
be created is known up front, and that it complies to a certain naming 
strategy.  It would be nice if this naming strategy could be configured somehow.

Any suggestions on this?


> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5549) Explain that `client.id` is just used as a prefix within Streams

2017-07-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091457#comment-16091457
 ] 

ASF GitHub Bot commented on KAFKA-5549:
---

GitHub user PranavManiar opened a pull request:

https://github.com/apache/kafka/pull/3544

KAFKA-5549 : Explain that 'client.id' is just used as a prefix within 
Streams

- Added new String CLIENT_ID_DOC in StreamsConfig for explanation

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/PranavManiar/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3544.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3544


commit e7e88110f68445ff6ebdbcf2fc94f8c7e3e25cc2
Author: Pranav Maniar 
Date:   2017-07-18T11:46:15Z

KAFKA-5549 : Explain that 'client.id' is just used as a prefix within 
Streams

- Added new String CLIENT_ID_DOC in StreamsConfig for explanation




> Explain that `client.id` is just used as a prefix within Streams
> 
>
> Key: KAFKA-5549
> URL: https://issues.apache.org/jira/browse/KAFKA-5549
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Trivial
>  Labels: beginner, newbie
>
> We should explain, that {{client.id}} is used as a prefix for internal 
> consumer, producer, and restore-consumer and not reuse 
> {{CommonClientConfigs.CLIENT_ID_DOC}} within {{StreamsConfig}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5302) Improve exception handling on streams client (communication with brokers)

2017-07-18 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska resolved KAFKA-5302.
-
Resolution: Won't Fix

> Improve exception handling on streams client (communication with brokers)
> -
>
> Key: KAFKA-5302
> URL: https://issues.apache.org/jira/browse/KAFKA-5302
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> These are exceptions in StreamsKafkaClient.java.
> Currently throws either StreamsException or BrokerNotFoundException.
> Used by InternalTopicManager to create topics and get their metadata.
> Used by StreamPartitionAssignor. 
> Currently InternalTopicManager retries a few times after catching an 
> exception. 
> A failure here is sent all the way up to the stream thread and will stop the 
> streams pipeline. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5217) Improve Streams internal exception handling

2017-07-18 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska resolved KAFKA-5217.
-
Resolution: Duplicate

> Improve Streams internal exception handling
> ---
>
> Key: KAFKA-5217
> URL: https://issues.apache.org/jira/browse/KAFKA-5217
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.11.1.0
>
>
> Streams does not handle all exceptions gracefully atm, but tend to throw 
> exceptions to the user, even if we could handle them internally and recover 
> automatically. We want to revisit this exception handling to be more 
> resilient.
> For example, for any kind of rebalance exception, we should just log it, and 
> rejoin the consumer group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5600) GroupMetadataManager doesn't read offsets of segmented logs correctly

2017-07-18 Thread Jan Burkhardt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jan Burkhardt updated KAFKA-5600:
-
Affects Version/s: (was: 0.10.2.0)
   0.10.2.1

> GroupMetadataManager doesn't read offsets of segmented logs correctly
> -
>
> Key: KAFKA-5600
> URL: https://issues.apache.org/jira/browse/KAFKA-5600
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1, 0.11.0.0
> Environment: any
>Reporter: Jan Burkhardt
>Priority: Critical
>  Labels: regression, reliability
> Fix For: 0.11.0.1
>
> Attachments: KafkaErrorConsumer.java, KafkaErrorProducer.java
>
>
> After long investigation we found a Problem in Kafka.
> When a __consumer_offsets partition gets segmented and Kafka is restarted and 
> needs to reload offsets, consumers will start at a wrong position when 
> metadata and offset events are in both segments.
> Reproduction:
> 1.) Start zookeeper and kafka as is from the archive
> {code}
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/zookeeper-server-start.sh 
> config/zookeeper.properties
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/kafka-server-start.sh 
> config/server.properties
> {code}
> 2.) Start [^KafkaErrorProducer.java] which adds 1M log entries to the topic 
> test
> 3.) Start [^KafkaErrorConsumer.java] which starts a consumer, reads 100 
> entries one by one and then closes the consumer. This leads to a 2nd segment 
> in /tmp/kafka-logs/__consumer_offsets-27. This step takes some time (around 
> 5mins). The close of the consumer is needed to have metadata events in the 
> segments too.
> 4.) Stop and restart the Kafka broker
> 5.) Start any consumer on topic test and group testgroup
> {code}
> bin/kafka-console-consumer.sh --from-beginning --bootstrap-server 
> localhost:9092 --topic test --consumer-property group.id=testgroup
> {code}
> Is:
> the consumer starts at the segmentation boundary
> Expected:
> the consumer starts at the end
> The Reason for this behavior is the closing brace of the while loop in 
> GroupMetadataManager#loadGroupsAndOffsets at a wrong position introduced with 
> commit 
> https://github.com/apache/kafka/commit/5bd06f1d542e6b588a1d402d059bc24690017d32
> I will prepare a pull request.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5607) Improve error handling in SocketServer to reduce illegal states later

2017-07-18 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-5607:
-

 Summary: Improve error handling in SocketServer to reduce illegal 
states later
 Key: KAFKA-5607
 URL: https://issues.apache.org/jira/browse/KAFKA-5607
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram


>From [~junrao] in https://github.com/apache/kafka/pull/3526:

{quote}
In SocketServer, it may be better to catch unexpected exceptions within the 
loop of socket key iteration in places like processCompletedReceives(). That 
way, if we hit an unexpected exception, it only affects a single key. 
{quote}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5595) Illegal state in SocketServer; attempt to send with another send in progress

2017-07-18 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091282#comment-16091282
 ] 

Rajini Sivaram commented on KAFKA-5595:
---

[~ijuma] [~hachikuji]  I have to admit I didn't look much deeper into it 
either. The scenario I was thinking of was where a slow broker closes a few 
connections because it thinks they are idle. Some connections without port 
reuse would show the log entry {{Attempting to send response via channel for 
which there is no open connection}}. One connection where the port did get 
reused will have a new channel.  When the response to the older connection is 
ready, it would get sent to the newer connection with a channel in selector. 
And that would throw {{IllegalStateException}} if there is already a send 
outstanding to the newer connection.

But I hadn't realized that there were 13K warnings. So I agree this is not the 
scenario here. 

> Illegal state in SocketServer; attempt to send with another send in progress
> 
>
> Key: KAFKA-5595
> URL: https://issues.apache.org/jira/browse/KAFKA-5595
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> I have seen this a couple times, but I'm not sure the conditions associated 
> with it. 
> {code}
> java.lang.IllegalStateException: Attempt to begin a send operation with prior 
> send operation still in progress.
>   at 
> org.apache.kafka.common.network.KafkaChannel.setSend(KafkaChannel.java:138)
>   at org.apache.kafka.common.network.Selector.send(Selector.java:248)
>   at kafka.network.Processor.sendResponse(SocketServer.scala:488)
>   at kafka.network.Processor.processNewResponses(SocketServer.scala:466)
>   at kafka.network.Processor.run(SocketServer.scala:431)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> Prior to this event, I see a lot of this message in the logs (always for the 
> same connection id):
> {code}
> Attempting to send response via channel for which there is no open 
> connection, connection id 7
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5606) Review consumer's RequestFuture usage pattern

2017-07-18 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-5606:
--

 Summary: Review consumer's RequestFuture usage pattern
 Key: KAFKA-5606
 URL: https://issues.apache.org/jira/browse/KAFKA-5606
 Project: Kafka
  Issue Type: Bug
Reporter: Ismael Juma
 Fix For: 0.11.1.0


KAFKA-5556 shows that we can perhaps tighten the usage pattern of the 
consumer's RequestFuture to avoid similar bugs in the future.

Jason suggested:

{quote}
Another way to see this bug is a failure to ensure completion of the future. 
Had we done so, then we could have skipped the failed check. This is why it 
worked prior to the patch which added the timeout. The pattern should really be 
something like this:

{code}
if (future.isDone()) {
  if (future.succeeded()) {
// handle success
  } else {
// handle failure
  }
}
{code}

I guess one benefit of the enum approach is that it forces you to ensure 
completion prior to checking any of the possible results. That said, I'm a bit 
more inclined to remove the isRetriable method and leave it to the caller to 
determine what is and is not retriable. Then the request future only has two 
completion states.
{quote}

An alternative is replacing succeeded and failed with a status method returning 
an enum with 3 states: SUCCEEDED, FAILED, RETRY (the enum approach mentioned 
above). This would make sense if we often have to handle these 3 states 
differently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)