[jira] [Commented] (KAFKA-6749) TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472915#comment-16472915
 ] 

ASF GitHub Bot commented on KAFKA-6749:
---

jadireddi opened a new pull request #4912: KAFKA-6749: Fixed TopologyTestDriver 
to process stream processing guarantee as exactly once
URL: https://github.com/apache/kafka/pull/4912
 
 
   https://issues.apache.org/jira/browse/KAFKA-6749
   
Fixed Stream processing topologies which are configured to use EXACTLY_ONCE 
processing guarantee can be tested with the `TopologyTestDriver`.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE
> --
>
> Key: KAFKA-6749
> URL: https://issues.apache.org/jira/browse/KAFKA-6749
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Frederic Arno
>Assignee: Jagadesh Adireddi
>Priority: Minor
>  Labels: newbie
>
> Stream processing topologies which are configured to use {{EXACTLY_ONCE}} 
> processing guarantee cannot be tested with the {{TopologyTestDriver}}. Tests 
> usually crash with {{java.lang.IllegalStateException: MockProducer hasn't 
> been initialized for transactions}} within the second call to 
> {{TopologyTestDriver.pipeInput()}}, the first call works fine.
> Changing the processing guarantee to {{AT_LEAST_ONCE}} makes tests pass.
> This is a problem because it is expected that proper processor topologies can 
> be successfully tested using {{TopologyTestDriver}}, however 
> {{TopologyTestDriver}} can't handle {{EXACTLY_ONCE}} and crashes during 
> tests. To a developer, this usually means that there is something wrong with 
> their processor topologies.
> Kafka developpers can reproduce this by adding:
> {code:java}
> put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE);{code}
> to line 88 of TopologyTestDriverTest: 
> streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
> Originally [reported on the 
> ML|http://mail-archives.apache.org/mod_mbox/kafka-users/201804.mbox/%3C54ab29ad-44e1-35bd-9c16-c1d8d68a88db%40gmail.com%3E].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6749) TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472905#comment-16472905
 ] 

ASF GitHub Bot commented on KAFKA-6749:
---

jadireddi closed pull request #4912: KAFKA-6749: Fixed TopologyTestDriver to 
process stream processing guarantee as exactly once
URL: https://github.com/apache/kafka/pull/4912
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE
> --
>
> Key: KAFKA-6749
> URL: https://issues.apache.org/jira/browse/KAFKA-6749
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Frederic Arno
>Assignee: Jagadesh Adireddi
>Priority: Minor
>  Labels: newbie
>
> Stream processing topologies which are configured to use {{EXACTLY_ONCE}} 
> processing guarantee cannot be tested with the {{TopologyTestDriver}}. Tests 
> usually crash with {{java.lang.IllegalStateException: MockProducer hasn't 
> been initialized for transactions}} within the second call to 
> {{TopologyTestDriver.pipeInput()}}, the first call works fine.
> Changing the processing guarantee to {{AT_LEAST_ONCE}} makes tests pass.
> This is a problem because it is expected that proper processor topologies can 
> be successfully tested using {{TopologyTestDriver}}, however 
> {{TopologyTestDriver}} can't handle {{EXACTLY_ONCE}} and crashes during 
> tests. To a developer, this usually means that there is something wrong with 
> their processor topologies.
> Kafka developpers can reproduce this by adding:
> {code:java}
> put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE);{code}
> to line 88 of TopologyTestDriverTest: 
> streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
> Originally [reported on the 
> ML|http://mail-archives.apache.org/mod_mbox/kafka-users/201804.mbox/%3C54ab29ad-44e1-35bd-9c16-c1d8d68a88db%40gmail.com%3E].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6738) Kafka Connect handling of bad data

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472754#comment-16472754
 ] 

ASF GitHub Bot commented on KAFKA-6738:
---

wicknicks opened a new pull request #5010: KAFKA-6738: Error Handling in Connect
URL: https://github.com/apache/kafka/pull/5010
 
 
   **_This PR is a WIP. It has been created to serve as a reference for 
discussions on the proposal at 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect._**
   
   This feature aims to change the Connect framework to allow it to 
automatically deal with errors while processing records in a Connector. The 
following behavior changes are introduced here: 
   1. **Retry on Failure**: Retry the failed operation a configurable number of 
times, with backoff between each retry.
   2. **Task Tolerance Limits**: Tolerate up to a configurable number of 
failures in a task.
   
   We also add the following ways to report errors, along with sufficient 
context to simplify the  debugging process: 
   1. **Log Error Context**: The error information along with processing 
context is logged along with the standard application logs.
   2. **Dead Letter Queue**: Produce the error information and processing 
context into a Kafka topic.
   
   The logged information consists of the following bits:
   1. Descriptions of the different stages (source/sink tasks, transformations 
and converters) in the connector, and their configs.
   2. The record which caused the exception.
   3. The exception and stack trace, if available.
   4. Number of attempts (if applicable) made to execute the failed operation.
   5. The time of error.
   
   New metrics which will monitor the number of failures, and the behavior of 
the response handler are added.
   
   The changes proposed here are **backward compatible**. The current behavior 
in Connect is to kill the task on the first error in any stage. This will 
remain the default behavior if the connector does not override any of the new 
configurations which are provided as part of this feature.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect handling of bad data
> --
>
> Key: KAFKA-6738
> URL: https://issues.apache.org/jira/browse/KAFKA-6738
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Randall Hauch
>Assignee: Arjun Satish
>Priority: Critical
> Fix For: 2.0.0
>
>
> Kafka Connect connectors and tasks fail when they run into an unexpected 
> situation or error, but the framework should provide more general "bad data 
> handling" options, including (perhaps among others):
> # fail fast, which is what we do today (assuming connector actually fails and 
> doesn't eat errors)
> # retry (possibly with configs to limit)
> # drop data and move on
> # dead letter queue
> This needs to be addressed in a way that handles errors from:
> # The connector itself (e.g. connectivity issues to the other system)
> # Converters/serializers (bad data, unexpected format, etc)
> # SMTs
> # Ideally the framework as well, though we obviously want to fix known bugs 
> anyway



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-11 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6566:
--
Attachment: (was: 6566.txt)

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Assignee: Robert Yokota
>Priority: Blocker
> Fix For: 2.0.0
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-11 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6566:
--
Comment: was deleted

(was: Attached 6566.txt, reflecting Robert's comment.

Waiting for Randall's response.)

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Assignee: Robert Yokota
>Priority: Blocker
> Fix For: 2.0.0
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-11 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-6566:


 Assignee: Robert Yokota
 Priority: Blocker  (was: Major)
Fix Version/s: 2.0.0

Raised to a blocker for 2.0 (next release), though we should probably backport 
any fix at least back to 3.3 or 3.2.

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Assignee: Robert Yokota
>Priority: Blocker
> Fix For: 2.0.0
>
> Attachments: 6566.txt
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap

2018-05-11 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472553#comment-16472553
 ] 

Matthias J. Sax commented on KAFKA-4217:


It's just a question of the API – returning "zero or more records" was always 
possible. However, you can return records either via `context.forward()` if via 
the `return` statement – this part can be confusion because `return` only 
allows you to return zero (ie, `null`) or one record (if you want to return 
multiple records you need to use `context.forward`). In `flatMap` there is only 
the `return` statement available that accepts an `Iterable` – thus, via 
`return` you can emit zero or more record.

Also note, that in `transfromValues` you only allow to use `return` but we 
don't allow to use `context.forward()` (because `context.forward` would allow 
to modify the key). Thus, it might make sense to change the return type of 
both, `Transformer#transform()` and `ValueTransformer#transform` to take an 
`Iterable` and disallow the usage of `context.forward`. On the other hand, 
`context.forward` allows to set metadata like record timestamp and thus, it 
might be good to preserve `context.forward()`... Overall, it seem we need to 
redesign the API to address the different concerns...

> KStream.transform equivalent of flatMap
> ---
>
> Key: KAFKA-4217
> URL: https://issues.apache.org/jira/browse/KAFKA-4217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api, needs-kip, newbie
>
> {{KStream.transform}} gives you access to state stores while allowing you to 
> return zero or one transformed {{KeyValue}}.  Alas, it is unclear what method 
> you should use if you want to access state stores and return zero or multiple 
> {{KeyValue}}.  Presumably you can use {{transform}}, always return {{null}}, 
> and use {{ProcessorContext.forward}} to emit {{KeyValues}}.
> It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
> allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap

2018-05-11 Thread Bruno Cadonna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472499#comment-16472499
 ] 

Bruno Cadonna commented on KAFKA-4217:
--

Is this issue still relevant? The docs of 
[{{transform}}|https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-]
 say "Transform each record of the input stream into zero or more records in 
the output stream", which sounds to me as if this issue has been already 
resolved.

> KStream.transform equivalent of flatMap
> ---
>
> Key: KAFKA-4217
> URL: https://issues.apache.org/jira/browse/KAFKA-4217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api, needs-kip, newbie
>
> {{KStream.transform}} gives you access to state stores while allowing you to 
> return zero or one transformed {{KeyValue}}.  Alas, it is unclear what method 
> you should use if you want to access state stores and return zero or multiple 
> {{KeyValue}}.  Presumably you can use {{transform}}, always return {{null}}, 
> and use {{ProcessorContext.forward}} to emit {{KeyValues}}.
> It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
> allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6813) Remove deprecated APIs from KIP-120 and KIP-182 in Streams

2018-05-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6813.
--
Resolution: Fixed

> Remove deprecated APIs from KIP-120 and KIP-182 in Streams
> --
>
> Key: KAFKA-6813
> URL: https://issues.apache.org/jira/browse/KAFKA-6813
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.0.0
>
>
> As we move on to the next major release 2.0, we can consider removing the 
> deprecated APIs from KIP-120 and KIP-182.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-11 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6566:
--
Attachment: 6566.txt

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
> Attachments: 6566.txt
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-11 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472411#comment-16472411
 ] 

Ted Yu commented on KAFKA-6566:
---

Attached 6566.txt, reflecting Robert's comment.

Waiting for Randall's response.

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
> Attachments: 6566.txt
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-11 Thread Robert Yokota (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472374#comment-16472374
 ] 

Robert Yokota edited comment on KAFKA-6566 at 5/11/18 6:04 PM:
---

I've just started looking at this, but it looks like the correct place to put 
the the {{task.stop()}} is in {{WorkerSourceTask.close()}}.  This would mirror 
the call to {{task.stop()}} in {{WorkerSinkTask.close()}}.   {{close()}} is 
called in a finally block in {{WorkerTask.doRun()}} here:

https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176

There is another possible change I am looking at and that is to put a call to 
{{task.stop()}} in {{WorkerSinkTask.stop()}}.  This would mirror the call to 
{{task.stop()}} in {{WorkerSourceTask.stop()}}.

Ideally the source and sink would be symmetrical in order to make it easier to 
reason about esp. for Connect developers.  The above changes assume that 
{{task.stop()}} is idempotent for both the source and sink.






was (Author: rayokota):
I've just started looking at this, but it looks like the correct place to put 
the the `task.stop()` is in `WorkerSourceTask.close()`.  This would mirror the 
call to `task.stop()` in `WorkerSinkTask.close()`.   `close()` is called in a 
finally block in `WorkerTask.doRun()` here:

https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176

There is another possible change I am looking at and that is to put a call to 
`task.stop()` in `WorkerSinkTask.stop()`.  This would mirror the call to 
`task.stop()` in `WorkerSourceTask.stop()`.

Ideally the source and sink would be symmetrical in order to make it easier to 
reason about esp. for Connect developers.  The above changes assume that 
`task.stop()` is idempotent for both the source and sink.





> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-11 Thread Robert Yokota (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472374#comment-16472374
 ] 

Robert Yokota commented on KAFKA-6566:
--

I've just started looking at this, but it looks like the correct place to put 
the the `task.stop()` is in `WorkerSourceTask.close()`.  This would mirror the 
call to `task.stop()` in `WorkerSinkTask.close()`.   `close()` is called in a 
finally block in `WorkerTask.doRun()` here:

https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176

There is another possible change I am looking at and that is to put a call to 
`task.stop()` in `WorkerSinkTask.stop()`.  This would mirror the call to 
`task.stop()` in `WorkerSourceTask.stop()`.

Ideally the source and sink would be symmetrical in order to make it easier to 
reason about esp. for Connect developers.  The above changes assume that 
`task.stop()` is idempotent for both the source and sink.





> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4849) Bug in KafkaStreams documentation

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472322#comment-16472322
 ] 

ASF GitHub Bot commented on KAFKA-4849:
---

mjsax closed pull request #2648: KAFKA-4849: Bug in KafkaStreams documentation
URL: https://github.com/apache/kafka/pull/2648
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 3396f6369b5..91b7faa622a 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -128,14 +128,36 @@ public ConfigDef define(ConfigKey key) {
  * @param displayName   the name suitable for display
  * @param dependentsthe configurations that are dependents of this 
configuration
  * @param recommender   the recommender provides valid values given the 
parent configuration values
+ * @param deprecatedthe config is deprecated and should not be used 
any longer
  * @return This ConfigDef so you can chain calls
  */
 public ConfigDef define(String name, Type type, Object defaultValue, 
Validator validator, Importance importance, String documentation,
-String group, int orderInGroup, Width width, 
String displayName, List dependents, Recommender recommender) {
-return define(new ConfigKey(name, type, defaultValue, validator, 
importance, documentation, group, orderInGroup, width, displayName, dependents, 
recommender));
+String group, int orderInGroup, Width width, 
String displayName, List dependents, Recommender recommender, boolean 
deprecated) {
+return define(new ConfigKey(name, type, defaultValue, validator, 
importance, documentation, group, orderInGroup, width, displayName, dependents, 
recommender, deprecated));
 }
 
 /**
+ * Define a new configuration
+ * @param name  the name of the config parameter
+ * @param type  the type of the config
+ * @param defaultValue  the default value to use if this config isn't 
present
+ * @param validator the validator to use in checking the correctness 
of the config
+ * @param importancethe importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup  the order of this config in the group
+ * @param width the width of the config
+ * @param displayName   the name suitable for display
+ * @param dependentsthe configurations that are dependents of this 
configuration
+ * @param recommender   the recommender provides valid values given the 
parent configuration values
+ * @return This ConfigDef so you can chain calls
+ */
+public ConfigDef define(String name, Type type, Object defaultValue, 
Validator validator, Importance importance, String documentation,
+String group, int orderInGroup, Width width, 
String displayName, List dependents, Recommender recommender) {
+return define(new ConfigKey(name, type, defaultValue, validator, 
importance, documentation, group, orderInGroup, width, displayName, dependents, 
recommender, false));
+}
+/**
+ *
  * Define a new configuration with no custom recommender
  * @param name  the name of the config parameter
  * @param type  the type of the config
@@ -900,11 +922,12 @@ public String toString() {
 public final String displayName;
 public final List dependents;
 public final Recommender recommender;
+public final boolean deprecated;
 
 public ConfigKey(String name, Type type, Object defaultValue, 
Validator validator,
  Importance importance, String documentation, String 
group,
  int orderInGroup, Width width, String displayName,
- List dependents, Recommender recommender) {
+ List dependents, Recommender recommender, 
boolean deprecated) {
 this.name = name;
 this.type = type;
 this.defaultValue = defaultValue == NO_DEFAULT_VALUE ? 
NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
@@ -919,6 +942,7 @@ public ConfigKey(String name, Type type, Object 
defaultValue, Validator validato
 this.width = width;
 this.displayName = displayName;
 this.recommender = recommender;
+this.deprecated = deprecated;
 }
 
 public 

[jira] [Commented] (KAFKA-6892) Kafka Streams memory usage grows over the time till OOM

2018-05-11 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472257#comment-16472257
 ] 

Matthias J. Sax commented on KAFKA-6892:


Thanks for the details. Hard to say. But as you use RocksDB, it should just 
spill to disk and only have part of the state in-memory – you have many stores 
thought (pipeline A: 2 aggregations plus 3 stream-stream-join --2 stores each-- 
plus another aggregation; pipeline B: 2 aggregations; thus overall you have 5 
plus 6 stores – note that windowed-stream-stream joins, need to store all raw 
records).

But as you mention that the OS kills the app, it seem that there is no OOM 
exception. And RocksDB should also spill to disk... 

Configuring RocksDB is a "black art" – maybe it help to run part of the 
pipeline to see how much memory individual parts need? One more follow up: do 
you use Interactive Queries? If yes, you need to make sure to close all 
iterators – otherwise they leak memory.

> Kafka Streams memory usage grows over the time till OOM
> ---
>
> Key: KAFKA-6892
> URL: https://issues.apache.org/jira/browse/KAFKA-6892
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Dawid Kulig
>Priority: Minor
> Attachments: kafka-streams-per-pod-resources-usage.png
>
>
> Hi. I am observing indefinite memory growth of my kafka-streams application. 
> It gets killed by the OS when reaching the memory limit (10gb). 
> It's running two unrelated pipelines (read from 4 source topics - 100 
> partitions each - aggregate data and write to two destination topics) 
> My environment: 
>  * Kubernetes cluster
>  * 4 app instances
>  * 10GB memory limit per pod (instance)
>  * JRE 8
> JVM / Streams app:
>  * -Xms2g
>  * -Xmx4g
>  * num.stream.threads = 4
>  * commit.interval.ms = 1000
>  * linger.ms = 1000
>  
> When my app is running for 24hours it reaches 10GB memory limit. Heap and GC 
> looks good, non-heap avg memory usage is 120MB. I've read it might be related 
> to the RocksDB that works underneath streams app, however I tried to tune it 
> using [confluent 
> doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  unfortunately with no luck. 
> RocksDB config #1:
> {code:java}
> tableConfig.setBlockCacheSize(16 * 1024 * 1024L);
> tableConfig.setBlockSize(16 * 1024L);
> tableConfig.setCacheIndexAndFilterBlocks(true);
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);{code}
> RocksDB config #2 
> {code:java}
> tableConfig.setBlockCacheSize(1024 * 1024L);
> tableConfig.setBlockSize(16 * 1024L);
> tableConfig.setCacheIndexAndFilterBlocks(true);
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024L);{code}
>  
> This behavior has only been observed with our production traffic, where per 
> topic input message rate is 10msg/sec and is pretty much constant (no peaks). 
> I am attaching cluster resources usage from last 24h.
> Any help or advice would be much appreciated. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6394) Prevent misconfiguration of advertised listeners

2018-05-11 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6394.

   Resolution: Fixed
Fix Version/s: 2.0.0

> Prevent misconfiguration of advertised listeners
> 
>
> Key: KAFKA-6394
> URL: https://issues.apache.org/jira/browse/KAFKA-6394
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.0
>
>
> We don't really have any protection from misconfiguration of the advertised 
> listeners. Sometimes users will copy the config from one host to another 
> during an upgrade. They may remember to update the broker id, but forget 
> about the advertised listeners. It can be surprisingly difficult to detect 
> this unless you know to look for it (e.g. you might just see a lot of 
> NotLeaderForPartition errors as the fetchers connect to the wrong broker). It 
> may not be totally foolproof, but it's probably enough for the common 
> misconfiguration case to check existing brokers to see whether there are any 
> which have already registered the advertised listener.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6394) Prevent misconfiguration of advertised listeners

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472150#comment-16472150
 ] 

ASF GitHub Bot commented on KAFKA-6394:
---

hachikuji closed pull request #4897: KAFKA-6394: Add a check to prevent 
misconfiguration of advertised listeners
URL: https://github.com/apache/kafka/pull/4897
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4834791f995..d8a36f77032 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.quota.ClientQuotaCallback
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index c729c8c90f7..ebbc0b8d205 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, 
_}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, 
ControlledShutdownResponse}
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
 import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
@@ -378,6 +377,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   }
 
   private[server] def createBrokerInfo: BrokerInfo = {
+val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")
+zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { 
broker =>
+  val commonEndPoints = broker.endPoints.map(e => 
s"${e.host}:${e.port}").intersect(endPoints)
+  require(commonEndPoints.isEmpty, s"Configured end points 
${commonEndPoints.mkString(",")} in" +
+s" advertised listeners are already registered by broker ${broker.id}")
+}
+
 val listeners = config.advertisedListeners.map { endpoint =>
   if (endpoint.port == 0)
 endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 79dec265f90..1e1a0a6a2a6 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -596,7 +596,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 // Ensure connections are made to brokers before external listener is made 
inaccessible
 describeConfig(externalAdminClient)
 
-// Update broker keystore for external listener to use invalid listener 
address
+// Update broker external listener to use invalid listener address
 // any address other than localhost is sufficient to fail (either 
connection or host name verification failure)
 val invalidHost = "192.168.0.1"
 alterAdvertisedListener(adminClient, externalAdminClient, "localhost", 
invalidHost)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
new file mode 100755
index 000..d78821a2ca5
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the 

[jira] [Resolved] (KAFKA-5965) Remove Deprecated AdminClient from Streams Resetter Tool

2018-05-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-5965.
--
   Resolution: Fixed
Fix Version/s: 2.0.0

> Remove Deprecated AdminClient from Streams Resetter Tool
> 
>
> Key: KAFKA-5965
> URL: https://issues.apache.org/jira/browse/KAFKA-5965
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Alexander Fedosov
>Priority: Major
>  Labels: newbie
> Fix For: 2.0.0
>
>
> To break the dependency on using ZK, the {{StreamsResetter}} tool now uses 
> the {{KafkaAdminClient}} for deleting topics and the 
> {{kafka.admin.AdminClient}} for verfiying no consumer groups are active 
> before running.
> Once the {{KafkaAdminClient}} has a describe group functionality, we should 
> remove the dependency on {{kafka.admin.AdminClient}} from the 
> {{StreamsResetter}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5965) Remove Deprecated AdminClient from Streams Resetter Tool

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472135#comment-16472135
 ] 

ASF GitHub Bot commented on KAFKA-5965:
---

guozhangwang closed pull request #4968: KAFKA-5965: Remove Deprecated 
AdminClient from Streams Resetter Tool
URL: https://github.com/apache/kafka/pull/4968
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index b0d52764b59..d7c4e435e58 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -23,8 +23,11 @@
 import joptsimple.OptionSpecBuilder;
 import kafka.utils.CommandLineUtils;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -42,6 +45,7 @@
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -51,6 +55,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -120,8 +125,8 @@ public int run(final String[] args,
 }
 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
options.valueOf(bootstrapServerOption));
 
-validateNoActiveConsumers(groupId, properties);
 kafkaAdminClient = (KafkaAdminClient) 
AdminClient.create(properties);
+validateNoActiveConsumers(groupId, kafkaAdminClient);
 
 allTopics.clear();
 allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, 
TimeUnit.SECONDS));
@@ -149,18 +154,14 @@ public int run(final String[] args,
 }
 
 private void validateNoActiveConsumers(final String groupId,
-   final Properties properties) {
-kafka.admin.AdminClient olderAdminClient = null;
-try {
-olderAdminClient = kafka.admin.AdminClient.create(properties);
-if (!olderAdminClient.describeConsumerGroup(groupId, 
0).consumers().get().isEmpty()) {
-throw new IllegalStateException("Consumer group '" + groupId + 
"' is still active. "
-+ "Make sure to stop all 
running application instances before running the reset tool.");
-}
-} finally {
-if (olderAdminClient != null) {
-olderAdminClient.close();
-}
+   final AdminClient adminClient) 
throws ExecutionException, InterruptedException {
+final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(Arrays.asList(groupId),
+(new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
+final List members = 
describeResult.describedGroups().get(groupId).get().members();
+if (!members.isEmpty()) {
+throw new IllegalStateException("Consumer group '" + groupId + "' 
is still active "
++ "and has following members: " + members + ". "
++ "Make sure to stop all running application instances 
before running the reset tool.");
 }
 }
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove Deprecated AdminClient from Streams Resetter Tool
> 
>
> Key: KAFKA-5965
> URL: https://issues.apache.org/jira/browse/KAFKA-5965
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Alexander Fedosov
>Priority: Major
>  Labels: newbie
>
> To break the dependency on using ZK, the {{StreamsResetter}} tool now uses 
> 

[jira] [Commented] (KAFKA-6884) ConsumerGroupCommand should use new AdminClient

2018-05-11 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472117#comment-16472117
 ] 

Jason Gustafson commented on KAFKA-6884:


Thanks [~asasvari]. I think the assignment strategy is exposed as 
{{partitionAssignor}} in {{ConsumerGroupDescription}}. Good point about the 
coordinator though. Maybe it's reasonable to also add it to 
{{ConsumerGroupDescription}}.

> ConsumerGroupCommand should use new AdminClient
> ---
>
> Key: KAFKA-6884
> URL: https://issues.apache.org/jira/browse/KAFKA-6884
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Attila Sasvari
>Priority: Major
>
> Now that we have KIP-222, we should update consumer-groups.sh to use the new 
> AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6884) ConsumerGroupCommand should use new AdminClient

2018-05-11 Thread Attila Sasvari (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472041#comment-16472041
 ] 

Attila Sasvari commented on KAFKA-6884:
---

It seems assignmentStrategy is also needed. Added some new review comments to 
the pull request. 

> ConsumerGroupCommand should use new AdminClient
> ---
>
> Key: KAFKA-6884
> URL: https://issues.apache.org/jira/browse/KAFKA-6884
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Attila Sasvari
>Priority: Major
>
> Now that we have KIP-222, we should update consumer-groups.sh to use the new 
> AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6884) ConsumerGroupCommand should use new AdminClient

2018-05-11 Thread Attila Sasvari (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471932#comment-16471932
 ] 

Attila Sasvari commented on KAFKA-6884:
---

[~hachikuji] Thanks, I downloaded the pull request as a diff & applied it. 

Using the new KafkaAdminClient, I am wondering how I can retrieve the 
coordinator / partition leader of the internal offset topic for a consumer 
group. 
[Earlier|https://github.com/apache/kafka/blob/c3921d489f4da80aad6f387158c33ec2e4bca52d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L571]
 it was returned: in the 
[ConsumerGroupSummary|https://github.com/apache/kafka/blob/c3921d489f4da80aad6f387158c33ec2e4bca52d/core/src/main/scala/kafka/admin/AdminClient.scala#L300]
 object.

> ConsumerGroupCommand should use new AdminClient
> ---
>
> Key: KAFKA-6884
> URL: https://issues.apache.org/jira/browse/KAFKA-6884
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Attila Sasvari
>Priority: Major
>
> Now that we have KIP-222, we should update consumer-groups.sh to use the new 
> AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6898) org.apache.kafka.common.errors.TimeoutException

2018-05-11 Thread Rishi (JIRA)
Rishi created KAFKA-6898:


 Summary: org.apache.kafka.common.errors.TimeoutException
 Key: KAFKA-6898
 URL: https://issues.apache.org/jira/browse/KAFKA-6898
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0
 Environment: Production
Reporter: Rishi


Getting error 
{code:java}
org.apache.kafka.common.errors.TimeoutException Failed to allocate memory 
within the configured max blocking time 59927 ms.{code}
while publishing events to Kafka. We are using Kafka Java client 0.10.2.0 with 
Kafka 0.10.1.0 broker.
This issue does not happen always but after certain time of applications 
running in service, it starts happening and applications never recover from 
this state until the producer instance is restarted.

The configuration of producer and on Kafka broker is default and hasn't been 
changed. What should be the course of action for this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6830) Add new metrics for consumer/replication fetch requests

2018-05-11 Thread Adam Kotwasinski (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kotwasinski updated KAFKA-6830:

Description: 
Currently, we have only one fetch request-related metric for a topic.
 As fetch requests are used by both client consumers and replicating brokers, 
it is impossible to tell if the particular partition (with replication factor > 
1) is being actively read from client by consumers.

Rationale for this improvement: as owner of kafka installation, but not the 
owner of clients, I want to know which topics still have active (real) 
consumers.

PR linked.

KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537

  was:
Currently, we have only one fetch request-related metric for a topic.
 As fetch requests are used by both client consumers and replicating brokers, 
it is impossible to tell if the particular partition (with replication factor > 
1) is being actively read from client by consumers.

Rationale for this improvement: as owner of kafka installation, but not the 
owner of clients, I want to know which topics still have active (real) 
consumers.

PR linked.


> Add new metrics for consumer/replication fetch requests
> ---
>
> Key: KAFKA-6830
> URL: https://issues.apache.org/jira/browse/KAFKA-6830
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Adam Kotwasinski
>Priority: Major
>
> Currently, we have only one fetch request-related metric for a topic.
>  As fetch requests are used by both client consumers and replicating brokers, 
> it is impossible to tell if the particular partition (with replication factor 
> > 1) is being actively read from client by consumers.
> Rationale for this improvement: as owner of kafka installation, but not the 
> owner of clients, I want to know which topics still have active (real) 
> consumers.
> PR linked.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6830) Add new metrics for consumer/replication fetch requests

2018-05-11 Thread Adam Kotwasinski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471673#comment-16471673
 ] 

Adam Kotwasinski commented on KAFKA-6830:
-

KIP - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537

> Add new metrics for consumer/replication fetch requests
> ---
>
> Key: KAFKA-6830
> URL: https://issues.apache.org/jira/browse/KAFKA-6830
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Adam Kotwasinski
>Priority: Major
>
> Currently, we have only one fetch request-related metric for a topic.
>  As fetch requests are used by both client consumers and replicating brokers, 
> it is impossible to tell if the particular partition (with replication factor 
> > 1) is being actively read from client by consumers.
> Rationale for this improvement: as owner of kafka installation, but not the 
> owner of clients, I want to know which topics still have active (real) 
> consumers.
> PR linked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6892) Kafka Streams memory usage grows over the time till OOM

2018-05-11 Thread Dawid Kulig (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Kulig updated KAFKA-6892:
---
Description: 
Hi. I am observing indefinite memory growth of my kafka-streams application. It 
gets killed by the OS when reaching the memory limit (10gb). 

It's running two unrelated pipelines (read from 4 source topics - 100 
partitions each - aggregate data and write to two destination topics) 

My environment: 
 * Kubernetes cluster
 * 4 app instances
 * 10GB memory limit per pod (instance)
 * JRE 8

JVM / Streams app:
 * -Xms2g
 * -Xmx4g
 * num.stream.threads = 4
 * commit.interval.ms = 1000
 * linger.ms = 1000

 

When my app is running for 24hours it reaches 10GB memory limit. Heap and GC 
looks good, non-heap avg memory usage is 120MB. I've read it might be related 
to the RocksDB that works underneath streams app, however I tried to tune it 
using [confluent 
doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
 unfortunately with no luck. 

RocksDB config #1:
{code:java}
tableConfig.setBlockCacheSize(16 * 1024 * 1024L);
tableConfig.setBlockSize(16 * 1024L);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(2);{code}
RocksDB config #2 
{code:java}
tableConfig.setBlockCacheSize(1024 * 1024L);
tableConfig.setBlockSize(16 * 1024L);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(2);
options.setWriteBufferSize(8 * 1024L);{code}
 

This behavior has only been observed with our production traffic, where per 
topic input message rate is 10msg/sec and is pretty much constant (no peaks). I 
am attaching cluster resources usage from last 24h.

Any help or advice would be much appreciated. 

 

  was:
Hi. I am observing indefinite memory growth of my kafka-streams application. It 
gets killed by the OS when reaching the memory limit (10gb). 

It's running two unrelated pipelines (read from 4 source topics - 100 
partitions each - aggregate data and write to two destination topics) 

My environment: 
 * Kubernetes cluster
 * 4 app instances
 * 10GB memory limit per pod (instance)
 * JRE 8

JVM / Streams app:
 * -Xms2g
 * -Xmx4g
 * num.stream.threads = 4
 * commit.interval.ms = 1000
 * linger.ms = 1000

 

When my app is running for 24hours it reaches 10GB memory limit. Heap and GC 
looks good, non-heap avg memory usage is 120MB. I've read it might be related 
to the RocksDB that works underneath streams app, however I tried to tune it 
using [confluent 
doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
 however with no luck. 

RocksDB config #1:
{code:java}
tableConfig.setBlockCacheSize(16 * 1024 * 1024L);
tableConfig.setBlockSize(16 * 1024L);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(2);{code}
RocksDB config #2 
{code:java}
tableConfig.setBlockCacheSize(1024 * 1024L);
tableConfig.setBlockSize(16 * 1024L);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(2);
options.setWriteBufferSize(8 * 1024L);{code}
 

This behavior has only been observed with our production traffic, where per 
topic input message rate is 10msg/sec and is pretty much constant (no peaks). I 
am attaching cluster resources usage from last 24h.

Any help or advice would be much appreciated. 

 


> Kafka Streams memory usage grows over the time till OOM
> ---
>
> Key: KAFKA-6892
> URL: https://issues.apache.org/jira/browse/KAFKA-6892
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Dawid Kulig
>Priority: Minor
> Attachments: kafka-streams-per-pod-resources-usage.png
>
>
> Hi. I am observing indefinite memory growth of my kafka-streams application. 
> It gets killed by the OS when reaching the memory limit (10gb). 
> It's running two unrelated pipelines (read from 4 source topics - 100 
> partitions each - aggregate data and write to two destination topics) 
> My environment: 
>  * Kubernetes cluster
>  * 4 app instances
>  * 10GB memory limit per pod (instance)
>  * JRE 8
> JVM / Streams app:
>  * -Xms2g
>  * -Xmx4g
>  * num.stream.threads = 4
>  * commit.interval.ms = 1000
>  * linger.ms = 1000
>  
> When my app is running for 24hours it reaches 10GB memory limit. Heap and GC 
> looks good, non-heap avg memory usage is 120MB. I've read it might be related 
> to the RocksDB that works underneath streams app, however I tried to tune it 
> using [confluent 
> 

[jira] [Comment Edited] (KAFKA-6892) Kafka Streams memory usage grows over the time till OOM

2018-05-11 Thread Dawid Kulig (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471647#comment-16471647
 ] 

Dawid Kulig edited comment on KAFKA-6892 at 5/11/18 8:33 AM:
-

[~mjsax] Thank you for your comment. Shame on me but I must have missed that 
capacity planning guide. However having look on that guide I'd say we should 
fit in the resources we currently have. 

Speaking about topology, please see below:

We have 4 source topics: [ST1, ST2, ST3, ST4], messages with a string key and 2 
destination topics : [DT1, DT2]

Pipieline A)

{code}
final KStream aggregates = builder.stream("ST1")
.groupByKey()
.reduce((prev, next) -> prev)
.mapValues((AuthenticationHandshake value) -> {
// some custom mapping
}).toStream();

final KStream bStream = builder.stream("ST2").groupByKey().reduce(((prev, next) -> next)).toStream();
final KStream cStream = builder.stream("ST3");
final KStream dStream = builder.stream("ST4");

final KTable aggregateTable = aggregates
.leftJoin(bStream, ExampleAggregator::joinB, joinWindow)
.leftJoin(cStream, ExampleAggregator::joinC, joinWindow)
.leftJoin(dStream, ExampleAggregator::joinD, dJoinWindow)
.groupByKey()
.reduce((previousValue, newValue) -> newValue);
aggregateTable.to("DT1");


private static Aggregate joinB(final Aggregate aggregate, final B b) {
LOGGER.info("Joining b: {}", b);
aggregate.setB(b);
return aggregate;
}

private static Aggregate joinC(final Aggregate aggregate, final C c) {
LOGGER.info("Joining c: {}", c);
aggregate.setC(c);
return aggregate;
}

private static Aggregate joinD(final Aggregate aggregate, final D d) {
LOGGER.info("Joining d: {}", d);
aggregate.setD(d);
return aggregate;
}
{code}

Pipeline B)
{code}
// note that the DT1 from PipelineA is an input topic for this pipeline
final KStream aggregates = builder.stream(DT1);

final KTable aggregatesTable = aggregates
.filter((key, val) -> // custom filterint (non-null / non-empty))
.groupBy((key, val) -> val.somePropIdentifier())
.aggregate(() -> new Aggreagate2("", new ArrayList<>(64)),
(key, val, aggregate2Val) -> customAggregation(...),
aggregateSpecificAvroSerde
);

aggregatesTable.toStream()
.map((window, aggregate2) -> new KeyValue<>(aggregate2.somePropertyKey(), 
aggregate2))
.groupByKey()
.reduce((old, _new) -> _new)
.to(Serdes.String(), aggregateSpecificAvroSerde, "DT2");
{code}




was (Author: dkulig):
[~mjsax] Thank you for your comment. Shame on me but I must have missed that 
capacity planning guide. However having look on that guide I'd say we should 
fit in the resources we currently have. 

Speaking about topology, please see below:

We have 4 source topics: [ST1, ST2, ST3, ST4], messages with a string key and 2 
destination topics : [DT1, DT2]

Pipieline A)

{code}
final KStream aggregates = builder.stream("ST1")
.groupByKey()
.reduce((prev, next) -> prev)
.mapValues((AuthenticationHandshake value) -> {
// some custom mapping
}).toStream();

final KStream bStream = builder.stream("ST2").groupByKey().reduce(((prev, next) -> next)).toStream();
final KStream cStream = builder.stream("ST3");
final KStream dStream = builder.stream("ST4");

final KTable aggregateTable = aggregates
.leftJoin(bStream, ExampleAggregator::joinB, joinWindow)
.leftJoin(cStream, ExampleAggregator::joinC, joinWindow)
.leftJoin(dStream, ExampleAggregator::joinD, dJoinWindow)
.groupByKey()
.reduce((previousValue, newValue) -> newValue);
aggregateTable.to("DT1");


private static Aggregate joinB(final Aggregate aggregate, final B b) {
LOGGER.info("Joining b: {}", b);
aggregate.setB(b);
return aggregate;
}

private static Aggregate joinC(final Aggregate aggregate, final C c) {
LOGGER.info("Joining c: {}", c);
aggregate.setC(c);
return aggregate;
}

private static Aggregate joinD(final Aggregate aggregate, final D d) {
LOGGER.info("Joining d: {}", d);
aggregate.setD(d);
return aggregate;
}
{code}

Pipeline B)
{code}
// note that the DT1 from PipelineA is an input topic for this pipeline
final KStream aggregates = builder.stream(DT1);

final KTable aggregatesTable = aggregates
.filter((key, val) -> // custom filterint (non-null / non-empty))
.groupBy((key, val) -> val.somePropIdentifier())
.aggregate(() -> new Aggreagate2("", new ArrayList<>(64)),
(key, val, aggregate2Val) -> customAggregation(...),

[jira] [Commented] (KAFKA-6892) Kafka Streams memory usage grows over the time till OOM

2018-05-11 Thread Dawid Kulig (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471647#comment-16471647
 ] 

Dawid Kulig commented on KAFKA-6892:


[~mjsax] Thank you for your comment. Shame on me but I must have missed that 
capacity planning guide. However having look on that guide I'd say we should 
fit in the resources we currently have. 

Speaking about topology, please see below:

We have 4 source topics: [ST1, ST2, ST3, ST4], messages with a string key and 2 
destination topics : [DT1, DT2]

Pipieline A)

{code}
final KStream aggregates = builder.stream("ST1")
.groupByKey()
.reduce((prev, next) -> prev)
.mapValues((AuthenticationHandshake value) -> {
// some custom mapping
}).toStream();

final KStream bStream = builder.stream("ST2").groupByKey().reduce(((prev, next) -> next)).toStream();
final KStream cStream = builder.stream("ST3");
final KStream dStream = builder.stream("ST4");

final KTable aggregateTable = aggregates
.leftJoin(bStream, ExampleAggregator::joinB, joinWindow)
.leftJoin(cStream, ExampleAggregator::joinC, joinWindow)
.leftJoin(dStream, ExampleAggregator::joinD, dJoinWindow)
.groupByKey()
.reduce((previousValue, newValue) -> newValue);
aggregateTable.to("DT1");


private static Aggregate joinB(final Aggregate aggregate, final B b) {
LOGGER.info("Joining b: {}", b);
aggregate.setB(b);
return aggregate;
}

private static Aggregate joinC(final Aggregate aggregate, final C c) {
LOGGER.info("Joining c: {}", c);
aggregate.setC(c);
return aggregate;
}

private static Aggregate joinD(final Aggregate aggregate, final D d) {
LOGGER.info("Joining d: {}", d);
aggregate.setD(d);
return aggregate;
}
{code}

Pipeline B)
{code}
// note that the DT1 from PipelineA is an input topic for this pipeline
final KStream aggregates = builder.stream(DT1);

final KTable aggregatesTable = aggregates
.filter((key, val) -> // custom filterint (non-null / non-empty))
.groupBy((key, val) -> val.somePropIdentifier())
.aggregate(() -> new Aggreagate2("", new ArrayList<>(64)),
(key, val, aggregate2Val) -> customAggregation(...),
aggregateSpecificAvroSerde
);

aggregatesTable.toStream()
.map((window, aggregate2) -> new KeyValue<>(aggregate2.somePropertyKey(), 
aggregate2))
.groupByKey()
.reduce((old, _new) -> _new)
.to(Serdes.String(), aggregateSpecificAvroSerde, DT2);
{code}



> Kafka Streams memory usage grows over the time till OOM
> ---
>
> Key: KAFKA-6892
> URL: https://issues.apache.org/jira/browse/KAFKA-6892
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Dawid Kulig
>Priority: Minor
> Attachments: kafka-streams-per-pod-resources-usage.png
>
>
> Hi. I am observing indefinite memory growth of my kafka-streams application. 
> It gets killed by the OS when reaching the memory limit (10gb). 
> It's running two unrelated pipelines (read from 4 source topics - 100 
> partitions each - aggregate data and write to two destination topics) 
> My environment: 
>  * Kubernetes cluster
>  * 4 app instances
>  * 10GB memory limit per pod (instance)
>  * JRE 8
> JVM / Streams app:
>  * -Xms2g
>  * -Xmx4g
>  * num.stream.threads = 4
>  * commit.interval.ms = 1000
>  * linger.ms = 1000
>  
> When my app is running for 24hours it reaches 10GB memory limit. Heap and GC 
> looks good, non-heap avg memory usage is 120MB. I've read it might be related 
> to the RocksDB that works underneath streams app, however I tried to tune it 
> using [confluent 
> doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  however with no luck. 
> RocksDB config #1:
> {code:java}
> tableConfig.setBlockCacheSize(16 * 1024 * 1024L);
> tableConfig.setBlockSize(16 * 1024L);
> tableConfig.setCacheIndexAndFilterBlocks(true);
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);{code}
> RocksDB config #2 
> {code:java}
> tableConfig.setBlockCacheSize(1024 * 1024L);
> tableConfig.setBlockSize(16 * 1024L);
> tableConfig.setCacheIndexAndFilterBlocks(true);
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024L);{code}
>  
> This behavior has only been observed with our production traffic, where per 
> topic input message rate is 10msg/sec and is pretty much constant (no peaks). 
> I am attaching cluster resources usage from last 24h.
> Any help or advice would be much appreciated. 
>  



--
This message was sent 

[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing

2018-05-11 Thread qiuyueqiong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471522#comment-16471522
 ] 

qiuyueqiong commented on KAFKA-5611:


after applied https://github.com/apache/kafka/pull/3571 on version 0.10.2.0, we 
encountered an infinite loop occurs in ConsumerCoordinator.java:
java.lang.IllegalStateException: Coordinator selected invalid assignment 
protocol: null
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:208)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:305)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)

for some reason, the ConsumerCoordinator's resetGeneration() is called, and the 
protocol of generation is set to null.
then in method joinGroupIfNeeded() of 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.java 
,onJoinComplete() method throws an IllegalStateException, 
and the resetJoinGroupFuture() method will never has the chance to be call:
    if (future.succeeded()) {
    onJoinComplete(generation.generationId, generation.memberId, 
generation.protocol, future.value());

    // We reset the join group future only after the completion 
callback returns. This ensures
    // that if the callback is woken up, we will retry it on the 
next joinGroupIfNeeded.
    resetJoinGroupFuture();
    needsJoinPrepare = true;

the joinFuture is not clear to null, JoinGroupRequest will not be sent in the 
next loop

is this an issue?

> One or more consumers in a consumer-group stop consuming after rebalancing
> --
>
> Key: KAFKA-5611
> URL: https://issues.apache.org/jira/browse/KAFKA-5611
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Panos Skianis
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.1
>
> Attachments: Server 1, Server 2, Server 3, 
> bad-server-with-more-logging-1.tar.gz, kka02
>
>
> Scenario: 
>   - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on 
> (other apps need it but the one mentioned below is already on kafka 0.10.2.0  
> client).
>   - 3 servers running 1 consumer each under the same consumer groupId. 
>   - Servers seem to be consuming messages happily but then there is a timeout 
> to an external service that causes our app to restart the Kafka Consumer on 
> one of the servers (this is by design). That causes rebalancing of the group 
> and upon restart of one of the Consumers seem to "block".
>   - Server 3 is where the problems occur.
>   - Problem fixes itself either by restarting one of the 3 servers or cause 
> the group to rebalance again by using the console consumer with the 
> autocommit set to false and using the same group.
>  
> Note: 
>  - Haven't managed to recreate it at will yet.
>  - Mainly happens in production environment, often enough. Hence I do not 
> have any logs with DEBUG/TRACE statements yet.
>  - Extracts from log of each app server are attached. Also the log of the 
> kafka that seems to be dealing with the related group and generations.
>  - See COMMENT lines in the files for further info.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)