[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401255#comment-16401255 ] Ewen Cheslack-Postava commented on KAFKA-6661: -- Oh, also, I marked it as 1.1.1 release, but if we end up doing another RC we'll want to adjust this to 1.1.0. > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > -- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1 > > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401254#comment-16401254 ] Ewen Cheslack-Postava commented on KAFKA-6661: -- [~rhauch] Got this cherry-picked back to 0.10.0. If we want 0.9.0 as well, we probably need a separate PR since there was enough code movement to make it non-trivial. On a related note, it wasn't too bad in this case, but for things we want to backport its better to separate the must-have stuff from the nice-to-have improvements. It's not so much the risk (in this case it was just a toString and some log statements), but that the larger the patch, the less likely we get a clean cherry-pick and that's a lot more time consuming in cases where we want to cherry-pick through a bunch of release branches. In this case the pain was really caused by a different commit that mostly made cosmetic improvements that make cherry-picking encounter conflicts, but something to keep in mind in the future. > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > -- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1 > > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401219#comment-16401219 ] ASF GitHub Bot commented on KAFKA-6661: --- ewencp closed pull request #4716: KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused URL: https://github.com/apache/kafka/pull/4716 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 2995a4e813a..2ba785c4668 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -130,7 +130,7 @@ public void initialize(TaskConfig taskConfig) { try { this.taskConfig = taskConfig.originalsStrings(); this.consumer = createConsumer(); -this.context = new WorkerSinkTaskContext(consumer); +this.context = new WorkerSinkTaskContext(consumer, this); } catch (Throwable t) { log.error("{} Task failed initialization and will not be started.", this, t); onFailure(t); @@ -601,7 +601,7 @@ SinkTaskMetricsGroup sinkTaskMetricsGroup() { private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { -log.debug("{} Partitions assigned", WorkerSinkTask.this); +log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); lastCommittedOffsets = new HashMap<>(); currentOffsets = new HashMap<>(); for (TopicPartition tp : partitions) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java index 08789497645..386f992e82a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.IllegalWorkerStateException; import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Collections; @@ -29,26 +31,32 @@ import java.util.Set; public class WorkerSinkTaskContext implements SinkTaskContext { + +private final Logger log = LoggerFactory.getLogger(getClass()); private Map offsets; private long timeoutMs; private KafkaConsumer consumer; +private final WorkerSinkTask sinkTask; private final Set pausedPartitions; private boolean commitRequested; -public WorkerSinkTaskContext(KafkaConsumer consumer) { +public WorkerSinkTaskContext(KafkaConsumer consumer, WorkerSinkTask sinkTask) { this.offsets = new HashMap<>(); this.timeoutMs = -1L; this.consumer = consumer; +this.sinkTask = sinkTask; this.pausedPartitions = new HashSet<>(); } @Override public void offset(Map offsets) { +log.debug("{} Setting offsets for topic partitions {}", this, offsets); this.offsets.putAll(offsets); } @Override public void offset(TopicPartition tp, long offset) { +log.debug("{} Setting offset for topic partition {} to {}", this, tp, offset); offsets.put(tp, offset); } @@ -66,6 +74,7 @@ public void clearOffsets() { @Override public void timeout(long timeoutMs) { +log.debug("{} Setting timeout to {} ms", this, timeoutMs); this.timeoutMs = timeoutMs; } @@ -92,7 +101,12 @@ public void pause(TopicPartition... partitions) { } try { Collections.addAll(pausedPartitions, partitions); -consumer.pause(Arrays.asList(partitions)); +if (sinkTask.shouldPause()) { +log.debug("{} Connector is paused, so not pausing consumer's partitions {}", this, partitions); +} else { +consumer.pause(Arrays.asList(partitions)); +log.debug("{} Pausing partitions {}. Connector is not paused.", this, partitions); +} } catch (IllegalStateException e) { throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e); } @@ -105,7 +119,12 @@ public void re
[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399681#comment-16399681 ] ASF GitHub Bot commented on KAFKA-6661: --- rhauch opened a new pull request #4716: KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused URL: https://github.com/apache/kafka/pull/4716 Changed WorkerSinkTaskContext to only resume the consumer topic partitions when the connector/task is not in the paused state. The context tracks the set of topic partitions that are explicitly paused/resumed by the connector, and when the WorkerSinkTask resumes the tasks it currently resumes all topic partitions *except* those that are still explicitly paused in the context. Therefore, the change above should result in the desired behavior. Several debug statements were added to record when the context is called by the connector. This can be backported to older releases, since this bug goes back to 0.10 or 0.9. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > -- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399573#comment-16399573 ] Randall Hauch commented on KAFKA-6661: -- The {{WorkerSinkTask}} and {{WorkerSinkTaskContext}} both deal with pausing and resuming the consumer. Most of this logic is correct: * {{WorkerSinkTaskContext}} is already tracking the topic partitions that have been explicitly paused or resumed by the connector * {{WorkerSinkTaskContext.pause(TopicPartition...)}} adds the topic partitions to its paused set and always pauses those topic partitions in the consumer * {{WorkerSinkTaskContext.resume(TopicPartition...)}} removes the topic partitions from its paused set and *_always_* resumes those topic partitions in the consumer. *The _always_ part is what is incorrect; it should still remove the topic partitions from its set but should only tell the consumer to resume the topic partitions _when the consumer is not paused_.* * {{WorkerSinkTask.pauseAll()}} currently pauses all of the partitions, but does not use or change the context's set of paused partitions * {{WorkerSinkTask.resumeAll()}} currently resumes all topic partitions *_except_* those that are still explicitly paused in the context So, I think the only change that needs to be made is that {{WorkerSinkTaskContext.resume(TopicPartition...)}} should still remove the topic partitions from its set but should only tell the consumer to resume the topic partitions _when the consumer is not paused_. > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > -- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399567#comment-16399567 ] Randall Hauch commented on KAFKA-6661: -- After a connector is paused, the sink task worker will call {{consumer.poll(long)}}, which blocks and then times out after the configurable timeout period, returning 0 records: {noformat} [2018-03-13 18:21:33,756] TRACE WorkerSinkTask{id=s3-sink-0} Polling consumer with timeout 76631 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:282) ... [2018-03-13 18:21:33,758] DEBUG WorkerSinkTask{id=s3-sink-0} Finished offset commit successfully in 6 ms for sequence number 1: {s3_topic-0=OffsetAndMetadata{offset=27300, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:238) [2018-03-13 18:21:33,758] DEBUG WorkerSinkTask{id=s3-sink-0} Setting last committed offsets to {s3_topic-0=OffsetAndMetadata{offset=27300, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:241) ... [2018-03-13 18:22:50,391] TRACE WorkerSinkTask{id=s3-sink-0} Polling returned 0 messages (org.apache.kafka.connect.runtime.WorkerSinkTask:285) {noformat} The worker then processes and delivers the 0 messages to the connector, at which point the connector might ultimately call a method that calls {{WorkerSinkTaskContext.resume(...)}}, which currently *_resumes the consumer regardless whether the connector is paused_*. Here's a stack trace showing this: {noformat} "pool-1-thread-1@4159" prio=5 tid=0x1b nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1542) at org.apache.kafka.connect.runtime.WorkerSinkTaskContext.resume(WorkerSinkTaskContext.java:109) at io.confluent.connect.s3.TopicPartitionWriter.resume(TopicPartitionWriter.java:405) at io.confluent.connect.s3.TopicPartitionWriter.commitOnTimeIfNoData(TopicPartitionWriter.java:295) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:177) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) {noformat} > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > -- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)