[jira] [Commented] (KAFKA-6749) TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE
[ https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472915#comment-16472915 ] ASF GitHub Bot commented on KAFKA-6749: --- jadireddi opened a new pull request #4912: KAFKA-6749: Fixed TopologyTestDriver to process stream processing guarantee as exactly once URL: https://github.com/apache/kafka/pull/4912 https://issues.apache.org/jira/browse/KAFKA-6749 Fixed Stream processing topologies which are configured to use EXACTLY_ONCE processing guarantee can be tested with the `TopologyTestDriver`. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE > -- > > Key: KAFKA-6749 > URL: https://issues.apache.org/jira/browse/KAFKA-6749 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Frederic Arno >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > Stream processing topologies which are configured to use {{EXACTLY_ONCE}} > processing guarantee cannot be tested with the {{TopologyTestDriver}}. Tests > usually crash with {{java.lang.IllegalStateException: MockProducer hasn't > been initialized for transactions}} within the second call to > {{TopologyTestDriver.pipeInput()}}, the first call works fine. > Changing the processing guarantee to {{AT_LEAST_ONCE}} makes tests pass. > This is a problem because it is expected that proper processor topologies can > be successfully tested using {{TopologyTestDriver}}, however > {{TopologyTestDriver}} can't handle {{EXACTLY_ONCE}} and crashes during > tests. To a developer, this usually means that there is something wrong with > their processor topologies. > Kafka developpers can reproduce this by adding: > {code:java} > put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE);{code} > to line 88 of TopologyTestDriverTest: > streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java > Originally [reported on the > ML|http://mail-archives.apache.org/mod_mbox/kafka-users/201804.mbox/%3C54ab29ad-44e1-35bd-9c16-c1d8d68a88db%40gmail.com%3E]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6749) TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE
[ https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472905#comment-16472905 ] ASF GitHub Bot commented on KAFKA-6749: --- jadireddi closed pull request #4912: KAFKA-6749: Fixed TopologyTestDriver to process stream processing guarantee as exactly once URL: https://github.com/apache/kafka/pull/4912 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE > -- > > Key: KAFKA-6749 > URL: https://issues.apache.org/jira/browse/KAFKA-6749 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Frederic Arno >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > Stream processing topologies which are configured to use {{EXACTLY_ONCE}} > processing guarantee cannot be tested with the {{TopologyTestDriver}}. Tests > usually crash with {{java.lang.IllegalStateException: MockProducer hasn't > been initialized for transactions}} within the second call to > {{TopologyTestDriver.pipeInput()}}, the first call works fine. > Changing the processing guarantee to {{AT_LEAST_ONCE}} makes tests pass. > This is a problem because it is expected that proper processor topologies can > be successfully tested using {{TopologyTestDriver}}, however > {{TopologyTestDriver}} can't handle {{EXACTLY_ONCE}} and crashes during > tests. To a developer, this usually means that there is something wrong with > their processor topologies. > Kafka developpers can reproduce this by adding: > {code:java} > put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE);{code} > to line 88 of TopologyTestDriverTest: > streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java > Originally [reported on the > ML|http://mail-archives.apache.org/mod_mbox/kafka-users/201804.mbox/%3C54ab29ad-44e1-35bd-9c16-c1d8d68a88db%40gmail.com%3E]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6738) Kafka Connect handling of bad data
[ https://issues.apache.org/jira/browse/KAFKA-6738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472754#comment-16472754 ] ASF GitHub Bot commented on KAFKA-6738: --- wicknicks opened a new pull request #5010: KAFKA-6738: Error Handling in Connect URL: https://github.com/apache/kafka/pull/5010 **_This PR is a WIP. It has been created to serve as a reference for discussions on the proposal at https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect._** This feature aims to change the Connect framework to allow it to automatically deal with errors while processing records in a Connector. The following behavior changes are introduced here: 1. **Retry on Failure**: Retry the failed operation a configurable number of times, with backoff between each retry. 2. **Task Tolerance Limits**: Tolerate up to a configurable number of failures in a task. We also add the following ways to report errors, along with sufficient context to simplify the debugging process: 1. **Log Error Context**: The error information along with processing context is logged along with the standard application logs. 2. **Dead Letter Queue**: Produce the error information and processing context into a Kafka topic. The logged information consists of the following bits: 1. Descriptions of the different stages (source/sink tasks, transformations and converters) in the connector, and their configs. 2. The record which caused the exception. 3. The exception and stack trace, if available. 4. Number of attempts (if applicable) made to execute the failed operation. 5. The time of error. New metrics which will monitor the number of failures, and the behavior of the response handler are added. The changes proposed here are **backward compatible**. The current behavior in Connect is to kill the task on the first error in any stage. This will remain the default behavior if the connector does not override any of the new configurations which are provided as part of this feature. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect handling of bad data > -- > > Key: KAFKA-6738 > URL: https://issues.apache.org/jira/browse/KAFKA-6738 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Arjun Satish >Priority: Critical > Fix For: 2.0.0 > > > Kafka Connect connectors and tasks fail when they run into an unexpected > situation or error, but the framework should provide more general "bad data > handling" options, including (perhaps among others): > # fail fast, which is what we do today (assuming connector actually fails and > doesn't eat errors) > # retry (possibly with configs to limit) > # drop data and move on > # dead letter queue > This needs to be addressed in a way that handles errors from: > # The connector itself (e.g. connectivity issues to the other system) > # Converters/serializers (bad data, unexpected format, etc) > # SMTs > # Ideally the framework as well, though we obviously want to fix known bugs > anyway -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6566: -- Attachment: (was: 6566.txt) > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Assignee: Robert Yokota >Priority: Blocker > Fix For: 2.0.0 > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Issue Comment Deleted] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6566: -- Comment: was deleted (was: Attached 6566.txt, reflecting Robert's comment. Waiting for Randall's response.) > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Assignee: Robert Yokota >Priority: Blocker > Fix For: 2.0.0 > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Assigned] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch reassigned KAFKA-6566: Assignee: Robert Yokota Priority: Blocker (was: Major) Fix Version/s: 2.0.0 Raised to a blocker for 2.0 (next release), though we should probably backport any fix at least back to 3.3 or 3.2. > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Assignee: Robert Yokota >Priority: Blocker > Fix For: 2.0.0 > > Attachments: 6566.txt > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap
[ https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472553#comment-16472553 ] Matthias J. Sax commented on KAFKA-4217: It's just a question of the API – returning "zero or more records" was always possible. However, you can return records either via `context.forward()` if via the `return` statement – this part can be confusion because `return` only allows you to return zero (ie, `null`) or one record (if you want to return multiple records you need to use `context.forward`). In `flatMap` there is only the `return` statement available that accepts an `Iterable` – thus, via `return` you can emit zero or more record. Also note, that in `transfromValues` you only allow to use `return` but we don't allow to use `context.forward()` (because `context.forward` would allow to modify the key). Thus, it might make sense to change the return type of both, `Transformer#transform()` and `ValueTransformer#transform` to take an `Iterable` and disallow the usage of `context.forward`. On the other hand, `context.forward` allows to set metadata like record timestamp and thus, it might be good to preserve `context.forward()`... Overall, it seem we need to redesign the API to address the different concerns... > KStream.transform equivalent of flatMap > --- > > Key: KAFKA-4217 > URL: https://issues.apache.org/jira/browse/KAFKA-4217 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api, needs-kip, newbie > > {{KStream.transform}} gives you access to state stores while allowing you to > return zero or one transformed {{KeyValue}}. Alas, it is unclear what method > you should use if you want to access state stores and return zero or multiple > {{KeyValue}}. Presumably you can use {{transform}}, always return {{null}}, > and use {{ProcessorContext.forward}} to emit {{KeyValues}}. > It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or > allow store access from other {{KStream}} methods, such as {{flatMap}} itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap
[ https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472499#comment-16472499 ] Bruno Cadonna commented on KAFKA-4217: -- Is this issue still relevant? The docs of [{{transform}}|https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-] say "Transform each record of the input stream into zero or more records in the output stream", which sounds to me as if this issue has been already resolved. > KStream.transform equivalent of flatMap > --- > > Key: KAFKA-4217 > URL: https://issues.apache.org/jira/browse/KAFKA-4217 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api, needs-kip, newbie > > {{KStream.transform}} gives you access to state stores while allowing you to > return zero or one transformed {{KeyValue}}. Alas, it is unclear what method > you should use if you want to access state stores and return zero or multiple > {{KeyValue}}. Presumably you can use {{transform}}, always return {{null}}, > and use {{ProcessorContext.forward}} to emit {{KeyValues}}. > It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or > allow store access from other {{KStream}} methods, such as {{flatMap}} itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6813) Remove deprecated APIs from KIP-120 and KIP-182 in Streams
[ https://issues.apache.org/jira/browse/KAFKA-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6813. -- Resolution: Fixed > Remove deprecated APIs from KIP-120 and KIP-182 in Streams > -- > > Key: KAFKA-6813 > URL: https://issues.apache.org/jira/browse/KAFKA-6813 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.0.0 > > > As we move on to the next major release 2.0, we can consider removing the > deprecated APIs from KIP-120 and KIP-182. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6566: -- Attachment: 6566.txt > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > Attachments: 6566.txt > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472411#comment-16472411 ] Ted Yu commented on KAFKA-6566: --- Attached 6566.txt, reflecting Robert's comment. Waiting for Randall's response. > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > Attachments: 6566.txt > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Comment Edited] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472374#comment-16472374 ] Robert Yokota edited comment on KAFKA-6566 at 5/11/18 6:04 PM: --- I've just started looking at this, but it looks like the correct place to put the the {{task.stop()}} is in {{WorkerSourceTask.close()}}. This would mirror the call to {{task.stop()}} in {{WorkerSinkTask.close()}}. {{close()}} is called in a finally block in {{WorkerTask.doRun()}} here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176 There is another possible change I am looking at and that is to put a call to {{task.stop()}} in {{WorkerSinkTask.stop()}}. This would mirror the call to {{task.stop()}} in {{WorkerSourceTask.stop()}}. Ideally the source and sink would be symmetrical in order to make it easier to reason about esp. for Connect developers. The above changes assume that {{task.stop()}} is idempotent for both the source and sink. was (Author: rayokota): I've just started looking at this, but it looks like the correct place to put the the `task.stop()` is in `WorkerSourceTask.close()`. This would mirror the call to `task.stop()` in `WorkerSinkTask.close()`. `close()` is called in a finally block in `WorkerTask.doRun()` here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176 There is another possible change I am looking at and that is to put a call to `task.stop()` in `WorkerSinkTask.stop()`. This would mirror the call to `task.stop()` in `WorkerSourceTask.stop()`. Ideally the source and sink would be symmetrical in order to make it easier to reason about esp. for Connect developers. The above changes assume that `task.stop()` is idempotent for both the source and sink. > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472374#comment-16472374 ] Robert Yokota commented on KAFKA-6566: -- I've just started looking at this, but it looks like the correct place to put the the `task.stop()` is in `WorkerSourceTask.close()`. This would mirror the call to `task.stop()` in `WorkerSinkTask.close()`. `close()` is called in a finally block in `WorkerTask.doRun()` here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176 There is another possible change I am looking at and that is to put a call to `task.stop()` in `WorkerSinkTask.stop()`. This would mirror the call to `task.stop()` in `WorkerSourceTask.stop()`. Ideally the source and sink would be symmetrical in order to make it easier to reason about esp. for Connect developers. The above changes assume that `task.stop()` is idempotent for both the source and sink. > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Commented] (KAFKA-4849) Bug in KafkaStreams documentation
[ https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472322#comment-16472322 ] ASF GitHub Bot commented on KAFKA-4849: --- mjsax closed pull request #2648: KAFKA-4849: Bug in KafkaStreams documentation URL: https://github.com/apache/kafka/pull/2648 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 3396f6369b5..91b7faa622a 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -128,14 +128,36 @@ public ConfigDef define(ConfigKey key) { * @param displayName the name suitable for display * @param dependentsthe configurations that are dependents of this configuration * @param recommender the recommender provides valid values given the parent configuration values + * @param deprecatedthe config is deprecated and should not be used any longer * @return This ConfigDef so you can chain calls */ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, -String group, int orderInGroup, Width width, String displayName, List dependents, Recommender recommender) { -return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender)); +String group, int orderInGroup, Width width, String displayName, List dependents, Recommender recommender, boolean deprecated) { +return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender, deprecated)); } /** + * Define a new configuration + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param validator the validator to use in checking the correctness of the config + * @param importancethe importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param dependentsthe configurations that are dependents of this configuration + * @param recommender the recommender provides valid values given the parent configuration values + * @return This ConfigDef so you can chain calls + */ +public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, +String group, int orderInGroup, Width width, String displayName, List dependents, Recommender recommender) { +return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender, false)); +} +/** + * * Define a new configuration with no custom recommender * @param name the name of the config parameter * @param type the type of the config @@ -900,11 +922,12 @@ public String toString() { public final String displayName; public final List dependents; public final Recommender recommender; +public final boolean deprecated; public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName, - List dependents, Recommender recommender) { + List dependents, Recommender recommender, boolean deprecated) { this.name = name; this.type = type; this.defaultValue = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type); @@ -919,6 +942,7 @@ public ConfigKey(String name, Type type, Object defaultValue, Validator validato this.width = width; this.displayName = displayName; this.recommender = recommender; +this.deprecated = deprecated; } public
[jira] [Commented] (KAFKA-6892) Kafka Streams memory usage grows over the time till OOM
[ https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472257#comment-16472257 ] Matthias J. Sax commented on KAFKA-6892: Thanks for the details. Hard to say. But as you use RocksDB, it should just spill to disk and only have part of the state in-memory – you have many stores thought (pipeline A: 2 aggregations plus 3 stream-stream-join --2 stores each-- plus another aggregation; pipeline B: 2 aggregations; thus overall you have 5 plus 6 stores – note that windowed-stream-stream joins, need to store all raw records). But as you mention that the OS kills the app, it seem that there is no OOM exception. And RocksDB should also spill to disk... Configuring RocksDB is a "black art" – maybe it help to run part of the pipeline to see how much memory individual parts need? One more follow up: do you use Interactive Queries? If yes, you need to make sure to close all iterators – otherwise they leak memory. > Kafka Streams memory usage grows over the time till OOM > --- > > Key: KAFKA-6892 > URL: https://issues.apache.org/jira/browse/KAFKA-6892 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Dawid Kulig >Priority: Minor > Attachments: kafka-streams-per-pod-resources-usage.png > > > Hi. I am observing indefinite memory growth of my kafka-streams application. > It gets killed by the OS when reaching the memory limit (10gb). > It's running two unrelated pipelines (read from 4 source topics - 100 > partitions each - aggregate data and write to two destination topics) > My environment: > * Kubernetes cluster > * 4 app instances > * 10GB memory limit per pod (instance) > * JRE 8 > JVM / Streams app: > * -Xms2g > * -Xmx4g > * num.stream.threads = 4 > * commit.interval.ms = 1000 > * linger.ms = 1000 > > When my app is running for 24hours it reaches 10GB memory limit. Heap and GC > looks good, non-heap avg memory usage is 120MB. I've read it might be related > to the RocksDB that works underneath streams app, however I tried to tune it > using [confluent > doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > unfortunately with no luck. > RocksDB config #1: > {code:java} > tableConfig.setBlockCacheSize(16 * 1024 * 1024L); > tableConfig.setBlockSize(16 * 1024L); > tableConfig.setCacheIndexAndFilterBlocks(true); > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2);{code} > RocksDB config #2 > {code:java} > tableConfig.setBlockCacheSize(1024 * 1024L); > tableConfig.setBlockSize(16 * 1024L); > tableConfig.setCacheIndexAndFilterBlocks(true); > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2); > options.setWriteBufferSize(8 * 1024L);{code} > > This behavior has only been observed with our production traffic, where per > topic input message rate is 10msg/sec and is pretty much constant (no peaks). > I am attaching cluster resources usage from last 24h. > Any help or advice would be much appreciated. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6394) Prevent misconfiguration of advertised listeners
[ https://issues.apache.org/jira/browse/KAFKA-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6394. Resolution: Fixed Fix Version/s: 2.0.0 > Prevent misconfiguration of advertised listeners > > > Key: KAFKA-6394 > URL: https://issues.apache.org/jira/browse/KAFKA-6394 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Manikumar >Priority: Major > Fix For: 2.0.0 > > > We don't really have any protection from misconfiguration of the advertised > listeners. Sometimes users will copy the config from one host to another > during an upgrade. They may remember to update the broker id, but forget > about the advertised listeners. It can be surprisingly difficult to detect > this unless you know to look for it (e.g. you might just see a lot of > NotLeaderForPartition errors as the fetchers connect to the wrong broker). It > may not be totally foolproof, but it's probably enough for the common > misconfiguration case to check existing brokers to see whether there are any > which have already registered the advertised listener. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6394) Prevent misconfiguration of advertised listeners
[ https://issues.apache.org/jira/browse/KAFKA-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472150#comment-16472150 ] ASF GitHub Bot commented on KAFKA-6394: --- hachikuji closed pull request #4897: KAFKA-6394: Add a check to prevent misconfiguration of advertised listeners URL: https://github.com/apache/kafka/pull/4897 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4834791f995..d8a36f77032 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.server.quota.ClientQuotaCallback import scala.collection.JavaConverters._ import scala.collection.Map diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c729c8c90f7..ebbc0b8d205 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -43,7 +43,6 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _} import org.apache.kafka.common.network._ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse} -import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internal.ScramMechanism import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} @@ -378,6 +377,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP } private[server] def createBrokerInfo: BrokerInfo = { +val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}") +zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker => + val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints) + require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" + +s" advertised listeners are already registered by broker ${broker.id}") +} + val listeners = config.advertisedListeners.map { endpoint => if (endpoint.port == 0) endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 79dec265f90..1e1a0a6a2a6 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -596,7 +596,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // Ensure connections are made to brokers before external listener is made inaccessible describeConfig(externalAdminClient) -// Update broker keystore for external listener to use invalid listener address +// Update broker external listener to use invalid listener address // any address other than localhost is sufficient to fail (either connection or host name verification failure) val invalidHost = "192.168.0.1" alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost) diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala new file mode 100755 index 000..d78821a2ca5 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the
[jira] [Resolved] (KAFKA-5965) Remove Deprecated AdminClient from Streams Resetter Tool
[ https://issues.apache.org/jira/browse/KAFKA-5965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-5965. -- Resolution: Fixed Fix Version/s: 2.0.0 > Remove Deprecated AdminClient from Streams Resetter Tool > > > Key: KAFKA-5965 > URL: https://issues.apache.org/jira/browse/KAFKA-5965 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Bill Bejeck >Assignee: Alexander Fedosov >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > To break the dependency on using ZK, the {{StreamsResetter}} tool now uses > the {{KafkaAdminClient}} for deleting topics and the > {{kafka.admin.AdminClient}} for verfiying no consumer groups are active > before running. > Once the {{KafkaAdminClient}} has a describe group functionality, we should > remove the dependency on {{kafka.admin.AdminClient}} from the > {{StreamsResetter}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5965) Remove Deprecated AdminClient from Streams Resetter Tool
[ https://issues.apache.org/jira/browse/KAFKA-5965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472135#comment-16472135 ] ASF GitHub Bot commented on KAFKA-5965: --- guozhangwang closed pull request #4968: KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool URL: https://github.com/apache/kafka/pull/4968 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index b0d52764b59..d7c4e435e58 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -23,8 +23,11 @@ import joptsimple.OptionSpecBuilder; import kafka.utils.CommandLineUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -42,6 +45,7 @@ import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -51,6 +55,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -120,8 +125,8 @@ public int run(final String[] args, } properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption)); -validateNoActiveConsumers(groupId, properties); kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties); +validateNoActiveConsumers(groupId, kafkaAdminClient); allTopics.clear(); allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS)); @@ -149,18 +154,14 @@ public int run(final String[] args, } private void validateNoActiveConsumers(final String groupId, - final Properties properties) { -kafka.admin.AdminClient olderAdminClient = null; -try { -olderAdminClient = kafka.admin.AdminClient.create(properties); -if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { -throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " -+ "Make sure to stop all running application instances before running the reset tool."); -} -} finally { -if (olderAdminClient != null) { -olderAdminClient.close(); -} + final AdminClient adminClient) throws ExecutionException, InterruptedException { +final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Arrays.asList(groupId), +(new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000)); +final List members = describeResult.describedGroups().get(groupId).get().members(); +if (!members.isEmpty()) { +throw new IllegalStateException("Consumer group '" + groupId + "' is still active " ++ "and has following members: " + members + ". " ++ "Make sure to stop all running application instances before running the reset tool."); } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove Deprecated AdminClient from Streams Resetter Tool > > > Key: KAFKA-5965 > URL: https://issues.apache.org/jira/browse/KAFKA-5965 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Bill Bejeck >Assignee: Alexander Fedosov >Priority: Major > Labels: newbie > > To break the dependency on using ZK, the {{StreamsResetter}} tool now uses >
[jira] [Commented] (KAFKA-6884) ConsumerGroupCommand should use new AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472117#comment-16472117 ] Jason Gustafson commented on KAFKA-6884: Thanks [~asasvari]. I think the assignment strategy is exposed as {{partitionAssignor}} in {{ConsumerGroupDescription}}. Good point about the coordinator though. Maybe it's reasonable to also add it to {{ConsumerGroupDescription}}. > ConsumerGroupCommand should use new AdminClient > --- > > Key: KAFKA-6884 > URL: https://issues.apache.org/jira/browse/KAFKA-6884 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Attila Sasvari >Priority: Major > > Now that we have KIP-222, we should update consumer-groups.sh to use the new > AdminClient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6884) ConsumerGroupCommand should use new AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472041#comment-16472041 ] Attila Sasvari commented on KAFKA-6884: --- It seems assignmentStrategy is also needed. Added some new review comments to the pull request. > ConsumerGroupCommand should use new AdminClient > --- > > Key: KAFKA-6884 > URL: https://issues.apache.org/jira/browse/KAFKA-6884 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Attila Sasvari >Priority: Major > > Now that we have KIP-222, we should update consumer-groups.sh to use the new > AdminClient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6884) ConsumerGroupCommand should use new AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471932#comment-16471932 ] Attila Sasvari commented on KAFKA-6884: --- [~hachikuji] Thanks, I downloaded the pull request as a diff & applied it. Using the new KafkaAdminClient, I am wondering how I can retrieve the coordinator / partition leader of the internal offset topic for a consumer group. [Earlier|https://github.com/apache/kafka/blob/c3921d489f4da80aad6f387158c33ec2e4bca52d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L571] it was returned: in the [ConsumerGroupSummary|https://github.com/apache/kafka/blob/c3921d489f4da80aad6f387158c33ec2e4bca52d/core/src/main/scala/kafka/admin/AdminClient.scala#L300] object. > ConsumerGroupCommand should use new AdminClient > --- > > Key: KAFKA-6884 > URL: https://issues.apache.org/jira/browse/KAFKA-6884 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Attila Sasvari >Priority: Major > > Now that we have KIP-222, we should update consumer-groups.sh to use the new > AdminClient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6898) org.apache.kafka.common.errors.TimeoutException
Rishi created KAFKA-6898: Summary: org.apache.kafka.common.errors.TimeoutException Key: KAFKA-6898 URL: https://issues.apache.org/jira/browse/KAFKA-6898 Project: Kafka Issue Type: Bug Affects Versions: 0.10.2.0 Environment: Production Reporter: Rishi Getting error {code:java} org.apache.kafka.common.errors.TimeoutException Failed to allocate memory within the configured max blocking time 59927 ms.{code} while publishing events to Kafka. We are using Kafka Java client 0.10.2.0 with Kafka 0.10.1.0 broker. This issue does not happen always but after certain time of applications running in service, it starts happening and applications never recover from this state until the producer instance is restarted. The configuration of producer and on Kafka broker is default and hasn't been changed. What should be the course of action for this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6830) Add new metrics for consumer/replication fetch requests
[ https://issues.apache.org/jira/browse/KAFKA-6830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kotwasinski updated KAFKA-6830: Description: Currently, we have only one fetch request-related metric for a topic. As fetch requests are used by both client consumers and replicating brokers, it is impossible to tell if the particular partition (with replication factor > 1) is being actively read from client by consumers. Rationale for this improvement: as owner of kafka installation, but not the owner of clients, I want to know which topics still have active (real) consumers. PR linked. KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537 was: Currently, we have only one fetch request-related metric for a topic. As fetch requests are used by both client consumers and replicating brokers, it is impossible to tell if the particular partition (with replication factor > 1) is being actively read from client by consumers. Rationale for this improvement: as owner of kafka installation, but not the owner of clients, I want to know which topics still have active (real) consumers. PR linked. > Add new metrics for consumer/replication fetch requests > --- > > Key: KAFKA-6830 > URL: https://issues.apache.org/jira/browse/KAFKA-6830 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Adam Kotwasinski >Priority: Major > > Currently, we have only one fetch request-related metric for a topic. > As fetch requests are used by both client consumers and replicating brokers, > it is impossible to tell if the particular partition (with replication factor > > 1) is being actively read from client by consumers. > Rationale for this improvement: as owner of kafka installation, but not the > owner of clients, I want to know which topics still have active (real) > consumers. > PR linked. > KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6830) Add new metrics for consumer/replication fetch requests
[ https://issues.apache.org/jira/browse/KAFKA-6830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471673#comment-16471673 ] Adam Kotwasinski commented on KAFKA-6830: - KIP - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537 > Add new metrics for consumer/replication fetch requests > --- > > Key: KAFKA-6830 > URL: https://issues.apache.org/jira/browse/KAFKA-6830 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Adam Kotwasinski >Priority: Major > > Currently, we have only one fetch request-related metric for a topic. > As fetch requests are used by both client consumers and replicating brokers, > it is impossible to tell if the particular partition (with replication factor > > 1) is being actively read from client by consumers. > Rationale for this improvement: as owner of kafka installation, but not the > owner of clients, I want to know which topics still have active (real) > consumers. > PR linked. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6892) Kafka Streams memory usage grows over the time till OOM
[ https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Kulig updated KAFKA-6892: --- Description: Hi. I am observing indefinite memory growth of my kafka-streams application. It gets killed by the OS when reaching the memory limit (10gb). It's running two unrelated pipelines (read from 4 source topics - 100 partitions each - aggregate data and write to two destination topics) My environment: * Kubernetes cluster * 4 app instances * 10GB memory limit per pod (instance) * JRE 8 JVM / Streams app: * -Xms2g * -Xmx4g * num.stream.threads = 4 * commit.interval.ms = 1000 * linger.ms = 1000 When my app is running for 24hours it reaches 10GB memory limit. Heap and GC looks good, non-heap avg memory usage is 120MB. I've read it might be related to the RocksDB that works underneath streams app, however I tried to tune it using [confluent doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] unfortunately with no luck. RocksDB config #1: {code:java} tableConfig.setBlockCacheSize(16 * 1024 * 1024L); tableConfig.setBlockSize(16 * 1024L); tableConfig.setCacheIndexAndFilterBlocks(true); options.setTableFormatConfig(tableConfig); options.setMaxWriteBufferNumber(2);{code} RocksDB config #2 {code:java} tableConfig.setBlockCacheSize(1024 * 1024L); tableConfig.setBlockSize(16 * 1024L); tableConfig.setCacheIndexAndFilterBlocks(true); options.setTableFormatConfig(tableConfig); options.setMaxWriteBufferNumber(2); options.setWriteBufferSize(8 * 1024L);{code} This behavior has only been observed with our production traffic, where per topic input message rate is 10msg/sec and is pretty much constant (no peaks). I am attaching cluster resources usage from last 24h. Any help or advice would be much appreciated. was: Hi. I am observing indefinite memory growth of my kafka-streams application. It gets killed by the OS when reaching the memory limit (10gb). It's running two unrelated pipelines (read from 4 source topics - 100 partitions each - aggregate data and write to two destination topics) My environment: * Kubernetes cluster * 4 app instances * 10GB memory limit per pod (instance) * JRE 8 JVM / Streams app: * -Xms2g * -Xmx4g * num.stream.threads = 4 * commit.interval.ms = 1000 * linger.ms = 1000 When my app is running for 24hours it reaches 10GB memory limit. Heap and GC looks good, non-heap avg memory usage is 120MB. I've read it might be related to the RocksDB that works underneath streams app, however I tried to tune it using [confluent doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] however with no luck. RocksDB config #1: {code:java} tableConfig.setBlockCacheSize(16 * 1024 * 1024L); tableConfig.setBlockSize(16 * 1024L); tableConfig.setCacheIndexAndFilterBlocks(true); options.setTableFormatConfig(tableConfig); options.setMaxWriteBufferNumber(2);{code} RocksDB config #2 {code:java} tableConfig.setBlockCacheSize(1024 * 1024L); tableConfig.setBlockSize(16 * 1024L); tableConfig.setCacheIndexAndFilterBlocks(true); options.setTableFormatConfig(tableConfig); options.setMaxWriteBufferNumber(2); options.setWriteBufferSize(8 * 1024L);{code} This behavior has only been observed with our production traffic, where per topic input message rate is 10msg/sec and is pretty much constant (no peaks). I am attaching cluster resources usage from last 24h. Any help or advice would be much appreciated. > Kafka Streams memory usage grows over the time till OOM > --- > > Key: KAFKA-6892 > URL: https://issues.apache.org/jira/browse/KAFKA-6892 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Dawid Kulig >Priority: Minor > Attachments: kafka-streams-per-pod-resources-usage.png > > > Hi. I am observing indefinite memory growth of my kafka-streams application. > It gets killed by the OS when reaching the memory limit (10gb). > It's running two unrelated pipelines (read from 4 source topics - 100 > partitions each - aggregate data and write to two destination topics) > My environment: > * Kubernetes cluster > * 4 app instances > * 10GB memory limit per pod (instance) > * JRE 8 > JVM / Streams app: > * -Xms2g > * -Xmx4g > * num.stream.threads = 4 > * commit.interval.ms = 1000 > * linger.ms = 1000 > > When my app is running for 24hours it reaches 10GB memory limit. Heap and GC > looks good, non-heap avg memory usage is 120MB. I've read it might be related > to the RocksDB that works underneath streams app, however I tried to tune it > using [confluent >
[jira] [Comment Edited] (KAFKA-6892) Kafka Streams memory usage grows over the time till OOM
[ https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471647#comment-16471647 ] Dawid Kulig edited comment on KAFKA-6892 at 5/11/18 8:33 AM: - [~mjsax] Thank you for your comment. Shame on me but I must have missed that capacity planning guide. However having look on that guide I'd say we should fit in the resources we currently have. Speaking about topology, please see below: We have 4 source topics: [ST1, ST2, ST3, ST4], messages with a string key and 2 destination topics : [DT1, DT2] Pipieline A) {code} final KStreamaggregates = builder. stream("ST1") .groupByKey() .reduce((prev, next) -> prev) .mapValues((AuthenticationHandshake value) -> { // some custom mapping }).toStream(); final KStream bStream = builder. stream("ST2").groupByKey().reduce(((prev, next) -> next)).toStream(); final KStream cStream = builder.stream("ST3"); final KStream dStream = builder.stream("ST4"); final KTable aggregateTable = aggregates .leftJoin(bStream, ExampleAggregator::joinB, joinWindow) .leftJoin(cStream, ExampleAggregator::joinC, joinWindow) .leftJoin(dStream, ExampleAggregator::joinD, dJoinWindow) .groupByKey() .reduce((previousValue, newValue) -> newValue); aggregateTable.to("DT1"); private static Aggregate joinB(final Aggregate aggregate, final B b) { LOGGER.info("Joining b: {}", b); aggregate.setB(b); return aggregate; } private static Aggregate joinC(final Aggregate aggregate, final C c) { LOGGER.info("Joining c: {}", c); aggregate.setC(c); return aggregate; } private static Aggregate joinD(final Aggregate aggregate, final D d) { LOGGER.info("Joining d: {}", d); aggregate.setD(d); return aggregate; } {code} Pipeline B) {code} // note that the DT1 from PipelineA is an input topic for this pipeline final KStream aggregates = builder.stream(DT1); final KTable aggregatesTable = aggregates .filter((key, val) -> // custom filterint (non-null / non-empty)) .groupBy((key, val) -> val.somePropIdentifier()) .aggregate(() -> new Aggreagate2("", new ArrayList<>(64)), (key, val, aggregate2Val) -> customAggregation(...), aggregateSpecificAvroSerde ); aggregatesTable.toStream() .map((window, aggregate2) -> new KeyValue<>(aggregate2.somePropertyKey(), aggregate2)) .groupByKey() .reduce((old, _new) -> _new) .to(Serdes.String(), aggregateSpecificAvroSerde, "DT2"); {code} was (Author: dkulig): [~mjsax] Thank you for your comment. Shame on me but I must have missed that capacity planning guide. However having look on that guide I'd say we should fit in the resources we currently have. Speaking about topology, please see below: We have 4 source topics: [ST1, ST2, ST3, ST4], messages with a string key and 2 destination topics : [DT1, DT2] Pipieline A) {code} final KStream aggregates = builder. stream("ST1") .groupByKey() .reduce((prev, next) -> prev) .mapValues((AuthenticationHandshake value) -> { // some custom mapping }).toStream(); final KStream bStream = builder. stream("ST2").groupByKey().reduce(((prev, next) -> next)).toStream(); final KStream cStream = builder.stream("ST3"); final KStream dStream = builder.stream("ST4"); final KTable aggregateTable = aggregates .leftJoin(bStream, ExampleAggregator::joinB, joinWindow) .leftJoin(cStream, ExampleAggregator::joinC, joinWindow) .leftJoin(dStream, ExampleAggregator::joinD, dJoinWindow) .groupByKey() .reduce((previousValue, newValue) -> newValue); aggregateTable.to("DT1"); private static Aggregate joinB(final Aggregate aggregate, final B b) { LOGGER.info("Joining b: {}", b); aggregate.setB(b); return aggregate; } private static Aggregate joinC(final Aggregate aggregate, final C c) { LOGGER.info("Joining c: {}", c); aggregate.setC(c); return aggregate; } private static Aggregate joinD(final Aggregate aggregate, final D d) { LOGGER.info("Joining d: {}", d); aggregate.setD(d); return aggregate; } {code} Pipeline B) {code} // note that the DT1 from PipelineA is an input topic for this pipeline final KStream aggregates = builder.stream(DT1); final KTable aggregatesTable = aggregates .filter((key, val) -> // custom filterint (non-null / non-empty)) .groupBy((key, val) -> val.somePropIdentifier()) .aggregate(() -> new Aggreagate2("", new ArrayList<>(64)), (key, val, aggregate2Val) -> customAggregation(...),
[jira] [Commented] (KAFKA-6892) Kafka Streams memory usage grows over the time till OOM
[ https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471647#comment-16471647 ] Dawid Kulig commented on KAFKA-6892: [~mjsax] Thank you for your comment. Shame on me but I must have missed that capacity planning guide. However having look on that guide I'd say we should fit in the resources we currently have. Speaking about topology, please see below: We have 4 source topics: [ST1, ST2, ST3, ST4], messages with a string key and 2 destination topics : [DT1, DT2] Pipieline A) {code} final KStreamaggregates = builder. stream("ST1") .groupByKey() .reduce((prev, next) -> prev) .mapValues((AuthenticationHandshake value) -> { // some custom mapping }).toStream(); final KStream bStream = builder. stream("ST2").groupByKey().reduce(((prev, next) -> next)).toStream(); final KStream cStream = builder.stream("ST3"); final KStream dStream = builder.stream("ST4"); final KTable aggregateTable = aggregates .leftJoin(bStream, ExampleAggregator::joinB, joinWindow) .leftJoin(cStream, ExampleAggregator::joinC, joinWindow) .leftJoin(dStream, ExampleAggregator::joinD, dJoinWindow) .groupByKey() .reduce((previousValue, newValue) -> newValue); aggregateTable.to("DT1"); private static Aggregate joinB(final Aggregate aggregate, final B b) { LOGGER.info("Joining b: {}", b); aggregate.setB(b); return aggregate; } private static Aggregate joinC(final Aggregate aggregate, final C c) { LOGGER.info("Joining c: {}", c); aggregate.setC(c); return aggregate; } private static Aggregate joinD(final Aggregate aggregate, final D d) { LOGGER.info("Joining d: {}", d); aggregate.setD(d); return aggregate; } {code} Pipeline B) {code} // note that the DT1 from PipelineA is an input topic for this pipeline final KStream aggregates = builder.stream(DT1); final KTable aggregatesTable = aggregates .filter((key, val) -> // custom filterint (non-null / non-empty)) .groupBy((key, val) -> val.somePropIdentifier()) .aggregate(() -> new Aggreagate2("", new ArrayList<>(64)), (key, val, aggregate2Val) -> customAggregation(...), aggregateSpecificAvroSerde ); aggregatesTable.toStream() .map((window, aggregate2) -> new KeyValue<>(aggregate2.somePropertyKey(), aggregate2)) .groupByKey() .reduce((old, _new) -> _new) .to(Serdes.String(), aggregateSpecificAvroSerde, DT2); {code} > Kafka Streams memory usage grows over the time till OOM > --- > > Key: KAFKA-6892 > URL: https://issues.apache.org/jira/browse/KAFKA-6892 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Dawid Kulig >Priority: Minor > Attachments: kafka-streams-per-pod-resources-usage.png > > > Hi. I am observing indefinite memory growth of my kafka-streams application. > It gets killed by the OS when reaching the memory limit (10gb). > It's running two unrelated pipelines (read from 4 source topics - 100 > partitions each - aggregate data and write to two destination topics) > My environment: > * Kubernetes cluster > * 4 app instances > * 10GB memory limit per pod (instance) > * JRE 8 > JVM / Streams app: > * -Xms2g > * -Xmx4g > * num.stream.threads = 4 > * commit.interval.ms = 1000 > * linger.ms = 1000 > > When my app is running for 24hours it reaches 10GB memory limit. Heap and GC > looks good, non-heap avg memory usage is 120MB. I've read it might be related > to the RocksDB that works underneath streams app, however I tried to tune it > using [confluent > doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > however with no luck. > RocksDB config #1: > {code:java} > tableConfig.setBlockCacheSize(16 * 1024 * 1024L); > tableConfig.setBlockSize(16 * 1024L); > tableConfig.setCacheIndexAndFilterBlocks(true); > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2);{code} > RocksDB config #2 > {code:java} > tableConfig.setBlockCacheSize(1024 * 1024L); > tableConfig.setBlockSize(16 * 1024L); > tableConfig.setCacheIndexAndFilterBlocks(true); > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2); > options.setWriteBufferSize(8 * 1024L);{code} > > This behavior has only been observed with our production traffic, where per > topic input message rate is 10msg/sec and is pretty much constant (no peaks). > I am attaching cluster resources usage from last 24h. > Any help or advice would be much appreciated. > -- This message was sent
[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471522#comment-16471522 ] qiuyueqiong commented on KAFKA-5611: after applied https://github.com/apache/kafka/pull/3571 on version 0.10.2.0, we encountered an infinite loop occurs in ConsumerCoordinator.java: java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:208) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:305) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) for some reason, the ConsumerCoordinator's resetGeneration() is called, and the protocol of generation is set to null. then in method joinGroupIfNeeded() of org.apache.kafka.clients.consumer.internals.AbstractCoordinator.java ,onJoinComplete() method throws an IllegalStateException, and the resetJoinGroupFuture() method will never has the chance to be call: if (future.succeeded()) { onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value()); // We reset the join group future only after the completion callback returns. This ensures // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded. resetJoinGroupFuture(); needsJoinPrepare = true; the joinFuture is not clear to null, JoinGroupRequest will not be sent in the next loop is this an issue? > One or more consumers in a consumer-group stop consuming after rebalancing > -- > > Key: KAFKA-5611 > URL: https://issues.apache.org/jira/browse/KAFKA-5611 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Panos Skianis >Assignee: Jason Gustafson >Priority: Major > Labels: reliability > Fix For: 0.11.0.1 > > Attachments: Server 1, Server 2, Server 3, > bad-server-with-more-logging-1.tar.gz, kka02 > > > Scenario: > - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on > (other apps need it but the one mentioned below is already on kafka 0.10.2.0 > client). > - 3 servers running 1 consumer each under the same consumer groupId. > - Servers seem to be consuming messages happily but then there is a timeout > to an external service that causes our app to restart the Kafka Consumer on > one of the servers (this is by design). That causes rebalancing of the group > and upon restart of one of the Consumers seem to "block". > - Server 3 is where the problems occur. > - Problem fixes itself either by restarting one of the 3 servers or cause > the group to rebalance again by using the console consumer with the > autocommit set to false and using the same group. > > Note: > - Haven't managed to recreate it at will yet. > - Mainly happens in production environment, often enough. Hence I do not > have any logs with DEBUG/TRACE statements yet. > - Extracts from log of each app server are attached. Also the log of the > kafka that seems to be dealing with the related group and generations. > - See COMMENT lines in the files for further info. -- This message was sent by Atlassian JIRA (v7.6.3#76005)