[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-15 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401255#comment-16401255
 ] 

Ewen Cheslack-Postava commented on KAFKA-6661:
--

Oh, also, I marked it as 1.1.1 release, but if we end up doing another RC we'll 
want to adjust this to 1.1.0.

> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> --
>
> Key: KAFKA-6661
> URL: https://issues.apache.org/jira/browse/KAFKA-6661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
> Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-15 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401254#comment-16401254
 ] 

Ewen Cheslack-Postava commented on KAFKA-6661:
--

[~rhauch] Got this cherry-picked back to 0.10.0. If we want 0.9.0 as well, we 
probably need a separate PR since there was enough code movement to make it 
non-trivial.

On a related note, it wasn't too bad in this case, but for things we want to 
backport its better to separate the must-have stuff from the nice-to-have 
improvements. It's not so much the risk (in this case it was just a toString 
and some log statements), but that the larger the patch, the less likely we get 
a clean cherry-pick and that's a lot more time consuming in cases where we want 
to cherry-pick through a bunch of release branches. In this case the pain was 
really caused by a different commit that mostly made cosmetic improvements that 
make cherry-picking encounter conflicts, but something to keep in mind in the 
future.

> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> --
>
> Key: KAFKA-6661
> URL: https://issues.apache.org/jira/browse/KAFKA-6661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
> Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401219#comment-16401219
 ] 

ASF GitHub Bot commented on KAFKA-6661:
---

ewencp closed pull request #4716: KAFKA-6661: Ensure sink connectors don’t 
resume consumer when task is paused
URL: https://github.com/apache/kafka/pull/4716
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2995a4e813a..2ba785c4668 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -130,7 +130,7 @@ public void initialize(TaskConfig taskConfig) {
 try {
 this.taskConfig = taskConfig.originalsStrings();
 this.consumer = createConsumer();
-this.context = new WorkerSinkTaskContext(consumer);
+this.context = new WorkerSinkTaskContext(consumer, this);
 } catch (Throwable t) {
 log.error("{} Task failed initialization and will not be 
started.", this, t);
 onFailure(t);
@@ -601,7 +601,7 @@ SinkTaskMetricsGroup sinkTaskMetricsGroup() {
 private class HandleRebalance implements ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
-log.debug("{} Partitions assigned", WorkerSinkTask.this);
+log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
 lastCommittedOffsets = new HashMap<>();
 currentOffsets = new HashMap<>();
 for (TopicPartition tp : partitions) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 08789497645..386f992e82a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -20,6 +20,8 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
 import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,26 +31,32 @@
 import java.util.Set;
 
 public class WorkerSinkTaskContext implements SinkTaskContext {
+
+private final Logger log = LoggerFactory.getLogger(getClass());
 private Map offsets;
 private long timeoutMs;
 private KafkaConsumer consumer;
+private final WorkerSinkTask sinkTask;
 private final Set pausedPartitions;
 private boolean commitRequested;
 
-public WorkerSinkTaskContext(KafkaConsumer consumer) {
+public WorkerSinkTaskContext(KafkaConsumer consumer, 
WorkerSinkTask sinkTask) {
 this.offsets = new HashMap<>();
 this.timeoutMs = -1L;
 this.consumer = consumer;
+this.sinkTask = sinkTask;
 this.pausedPartitions = new HashSet<>();
 }
 
 @Override
 public void offset(Map offsets) {
+log.debug("{} Setting offsets for topic partitions {}", this, offsets);
 this.offsets.putAll(offsets);
 }
 
 @Override
 public void offset(TopicPartition tp, long offset) {
+log.debug("{} Setting offset for topic partition {} to {}", this, tp, 
offset);
 offsets.put(tp, offset);
 }
 
@@ -66,6 +74,7 @@ public void clearOffsets() {
 
 @Override
 public void timeout(long timeoutMs) {
+log.debug("{} Setting timeout to {} ms", this, timeoutMs);
 this.timeoutMs = timeoutMs;
 }
 
@@ -92,7 +101,12 @@ public void pause(TopicPartition... partitions) {
 }
 try {
 Collections.addAll(pausedPartitions, partitions);
-consumer.pause(Arrays.asList(partitions));
+if (sinkTask.shouldPause()) {
+log.debug("{} Connector is paused, so not pausing consumer's 
partitions {}", this, partitions);
+} else {
+consumer.pause(Arrays.asList(partitions));
+log.debug("{} Pausing partitions {}. Connector is not 
paused.", this, partitions);
+}
 } catch (IllegalStateException e) {
 throw new IllegalWorkerStateException("SinkTasks may not pause 
partitions that are not currently assigned to them.", e);
 }
@@ -105,7 +119,12 @@ public void re

[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399681#comment-16399681
 ] 

ASF GitHub Bot commented on KAFKA-6661:
---

rhauch opened a new pull request #4716: KAFKA-6661: Ensure sink connectors 
don’t resume consumer when task is paused
URL: https://github.com/apache/kafka/pull/4716
 
 
   Changed WorkerSinkTaskContext to only resume the consumer topic partitions 
when the connector/task is not in the paused state.
   
   The context tracks the set of topic partitions that are explicitly 
paused/resumed by the connector, and when the WorkerSinkTask resumes the tasks 
it currently resumes all topic partitions *except* those that are still 
explicitly paused in the context. Therefore, the change above should result in 
the desired behavior.
   
   Several debug statements were added to record when the context is called by 
the connector.
   
   This can be backported to older releases, since this bug goes back to 0.10 
or 0.9.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> --
>
> Key: KAFKA-6661
> URL: https://issues.apache.org/jira/browse/KAFKA-6661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-14 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399573#comment-16399573
 ] 

Randall Hauch commented on KAFKA-6661:
--

The {{WorkerSinkTask}} and {{WorkerSinkTaskContext}} both deal with pausing and 
resuming the consumer. Most of this logic is correct:

* {{WorkerSinkTaskContext}} is already tracking the topic partitions that have 
been explicitly paused or resumed by the connector
* {{WorkerSinkTaskContext.pause(TopicPartition...)}} adds the topic partitions 
to its paused set and always pauses those topic partitions in the consumer
* {{WorkerSinkTaskContext.resume(TopicPartition...)}} removes the topic 
partitions from its paused set and *_always_* resumes those topic partitions in 
the consumer. *The _always_ part is what is incorrect; it should still remove 
the topic partitions from its set but should only tell the consumer to resume 
the topic partitions _when the consumer is not paused_.*
* {{WorkerSinkTask.pauseAll()}} currently pauses all of the partitions, but 
does not use or change the context's set of paused partitions
* {{WorkerSinkTask.resumeAll()}} currently resumes all topic partitions 
*_except_* those that are still explicitly paused in the context

So, I think the only change that needs to be made is that 
{{WorkerSinkTaskContext.resume(TopicPartition...)}} should still remove the 
topic partitions from its set but should only tell the consumer to resume the 
topic partitions _when the consumer is not paused_.

> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> --
>
> Key: KAFKA-6661
> URL: https://issues.apache.org/jira/browse/KAFKA-6661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-14 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399567#comment-16399567
 ] 

Randall Hauch commented on KAFKA-6661:
--

After a connector is paused, the sink task worker will call 
{{consumer.poll(long)}}, which blocks and then times out after the configurable 
timeout period, returning 0 records:

{noformat}
[2018-03-13 18:21:33,756] TRACE WorkerSinkTask{id=s3-sink-0} Polling consumer 
with timeout 76631 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:282)
...
[2018-03-13 18:21:33,758] DEBUG WorkerSinkTask{id=s3-sink-0} Finished offset 
commit successfully in 6 ms for sequence number 1: 
{s3_topic-0=OffsetAndMetadata{offset=27300, metadata=''}} 
(org.apache.kafka.connect.runtime.WorkerSinkTask:238)
[2018-03-13 18:21:33,758] DEBUG WorkerSinkTask{id=s3-sink-0} Setting last 
committed offsets to {s3_topic-0=OffsetAndMetadata{offset=27300, metadata=''}} 
(org.apache.kafka.connect.runtime.WorkerSinkTask:241)
...
[2018-03-13 18:22:50,391] TRACE WorkerSinkTask{id=s3-sink-0} Polling returned 0 
messages (org.apache.kafka.connect.runtime.WorkerSinkTask:285)
{noformat}

The worker then processes and delivers the 0 messages to the connector, at 
which point the connector might ultimately call a method that calls 
{{WorkerSinkTaskContext.resume(...)}}, which currently *_resumes the consumer 
regardless whether the connector is paused_*.

Here's a stack trace showing this:

{noformat}
"pool-1-thread-1@4159" prio=5 tid=0x1b nid=NA runnable
  java.lang.Thread.State: RUNNABLE
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1542)
  at 
org.apache.kafka.connect.runtime.WorkerSinkTaskContext.resume(WorkerSinkTaskContext.java:109)
  at 
io.confluent.connect.s3.TopicPartitionWriter.resume(TopicPartitionWriter.java:405)
  at 
io.confluent.connect.s3.TopicPartitionWriter.commitOnTimeIfNoData(TopicPartitionWriter.java:295)
  at 
io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:177)
  at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195)
  at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
  at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
  at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
  at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
  at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
  at 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
  at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
{noformat}

> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> --
>
> Key: KAFKA-6661
> URL: https://issues.apache.org/jira/browse/KAFKA-6661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)