Repository: kafka
Updated Branches:
  refs/heads/trunk 1a653c813 -> a593db6a2


MINOR: Standardize logging of Worker-level messages from Tasks and Connectors

This ensures all logs have the connector/task ID, whether tasks are source or 
sink, and formats them consistently.

Author: Ewen Cheslack-Postava <m...@ewencp.org>

Reviewers: Randall Hauch <rha...@gmail.com>, Konstantine Karantasis 
<konstant...@confluent.io>, Jason Gustafson <ja...@confluent.io>

Closes #3639 from ewencp/standardize-connector-task-logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a593db6a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a593db6a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a593db6a

Branch: refs/heads/trunk
Commit: a593db6a2b6c3511215008b6d3dd6bd77f84e8c4
Parents: 1a653c8
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Authored: Wed Aug 9 09:07:27 2017 -0700
Committer: Ewen Cheslack-Postava <m...@ewencp.org>
Committed: Wed Aug 9 09:07:27 2017 -0700

----------------------------------------------------------------------
 .../runtime/SourceTaskOffsetCommitter.java      |  6 +--
 .../kafka/connect/runtime/WorkerConnector.java  | 19 ++++---
 .../kafka/connect/runtime/WorkerSinkTask.java   | 54 ++++++++++----------
 .../kafka/connect/runtime/WorkerSourceTask.java | 46 ++++++++---------
 .../kafka/connect/runtime/WorkerTask.java       |  6 +--
 .../connect/runtime/WorkerSinkTaskTest.java     |  2 +
 6 files changed, 66 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
index f93d7d2..c502809 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -103,17 +103,17 @@ class SourceTaskOffsetCommitter {
     }
 
     private void commit(WorkerSourceTask workerTask) {
-        log.debug("Committing offsets for {}", workerTask);
+        log.debug("{} Committing offsets", workerTask);
         try {
             if (workerTask.commitOffsets()) {
                 return;
             }
-            log.error("Failed to commit offsets for {}", workerTask);
+            log.error("{} Failed to commit offsets", workerTask);
         } catch (Throwable t) {
             // We're very careful about exceptions here since any uncaught 
exceptions in the commit
             // thread would cause the fixed interval schedule on the 
ExecutorService to stop running
             // for that task
-            log.error("Unhandled exception when committing {}: ", workerTask, 
t);
+            log.error("{} Unhandled exception when committing: ", workerTask, 
t);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 1937429..983db92 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -68,7 +68,7 @@ public class WorkerConnector {
     public void initialize(ConnectorConfig connectorConfig) {
         try {
             this.config = connectorConfig.originalsStrings();
-            log.debug("Initializing connector {} with config {}", connName, 
config);
+            log.debug("{} Initializing connector {} with config {}", this, 
connName, config);
 
             connector.initialize(new ConnectorContext() {
                 @Override
@@ -78,13 +78,13 @@ public class WorkerConnector {
 
                 @Override
                 public void raiseError(Exception e) {
-                    log.error("Connector raised an error {}", connName, e);
+                    log.error("{} Connector raised an error", this, e);
                     onFailure(e);
                     ctx.raiseError(e);
                 }
             });
         } catch (Throwable t) {
-            log.error("Error initializing connector {}", connName, t);
+            log.error("{} Error initializing connector", this, t);
             onFailure(t);
         }
     }
@@ -105,7 +105,7 @@ public class WorkerConnector {
                     throw new IllegalArgumentException("Cannot start connector 
in state " + state);
             }
         } catch (Throwable t) {
-            log.error("Error while starting connector {}", connName, t);
+            log.error("{} Error while starting connector", this, t);
             onFailure(t);
             return false;
         }
@@ -149,7 +149,7 @@ public class WorkerConnector {
                     throw new IllegalArgumentException("Cannot pause connector 
in state " + state);
             }
         } catch (Throwable t) {
-            log.error("Error while shutting down connector {}", connName, t);
+            log.error("{} Error while shutting down connector", this, t);
             statusListener.onFailure(connName, t);
             this.state = State.FAILED;
         }
@@ -161,7 +161,7 @@ public class WorkerConnector {
                 connector.stop();
             this.state = State.STOPPED;
         } catch (Throwable t) {
-            log.error("Error while shutting down connector {}", connName, t);
+            log.error("{} Error while shutting down connector", this, t);
             this.state = State.FAILED;
         } finally {
             statusListener.onShutdown(connName);
@@ -170,11 +170,11 @@ public class WorkerConnector {
 
     public void transitionTo(TargetState targetState) {
         if (state == State.FAILED) {
-            log.warn("Cannot transition connector {} to {} since it has 
failed", connName, targetState);
+            log.warn("{} Cannot transition connector to {} since it has 
failed", this, targetState);
             return;
         }
 
-        log.debug("Transition connector {} to {}", connName, targetState);
+        log.debug("{} Transition connector to {}", this, targetState);
         if (targetState == TargetState.PAUSED) {
             pause();
         } else if (targetState == TargetState.STARTED) {
@@ -198,8 +198,7 @@ public class WorkerConnector {
     @Override
     public String toString() {
         return "WorkerConnector{" +
-                "connName='" + connName + '\'' +
-                ", connector=" + connector +
+                "id=" + connName +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index c4567a3..9295712 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -112,7 +112,7 @@ class WorkerSinkTask extends WorkerTask {
             this.consumer = createConsumer();
             this.context = new WorkerSinkTaskContext(consumer);
         } catch (Throwable t) {
-            log.error("Task {} failed initialization and will not be 
started.", t);
+            log.error("{} Task failed initialization and will not be 
started.", this, t);
             onFailure(t);
         }
     }
@@ -170,7 +170,7 @@ class WorkerSinkTask extends WorkerTask {
 
             // Check for timed out commits
             if (committing && now >= commitTimeoutMs) {
-                log.warn("Commit of {} offsets timed out", this);
+                log.warn("{} Commit of offsets timed out", this);
                 commitFailures++;
                 committing = false;
             }
@@ -179,7 +179,7 @@ class WorkerSinkTask extends WorkerTask {
             long timeoutMs = Math.max(nextCommit - now, 0);
             poll(timeoutMs);
         } catch (WakeupException we) {
-            log.trace("{} consumer woken up", id);
+            log.trace("{} consumer woken up", this);
 
             if (isStopping())
                 return;
@@ -197,15 +197,13 @@ class WorkerSinkTask extends WorkerTask {
 
     private void onCommitCompleted(Throwable error, long seqno) {
         if (commitSeqno != seqno) {
-            log.debug("Got callback for timed out commit {}: {}, but most 
recent commit is {}",
-                    this,
-                    seqno, commitSeqno);
+            log.debug("{} Got callback for timed out commit: {}, but most 
recent commit is {}", this, seqno, commitSeqno);
         } else {
             if (error != null) {
-                log.error("Commit of {} offsets threw an unexpected exception: 
", this, error);
+                log.error("{} Commit of offsets threw an unexpected exception: 
", this, error);
                 commitFailures++;
             } else {
-                log.debug("Finished {} offset commit successfully in {} ms",
+                log.debug("{} Finished offset commit successfully in {} ms",
                         this, time.milliseconds() - commitStarted);
                 commitFailures = 0;
             }
@@ -221,16 +219,16 @@ class WorkerSinkTask extends WorkerTask {
      * Initializes and starts the SinkTask.
      */
     protected void initializeAndStart() {
-        log.debug("Initializing task {} ", id);
+        log.debug("{} Initializing task", this);
         String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
         if (topicsStr == null || topicsStr.isEmpty())
             throw new ConnectException("Sink tasks require a list of topics.");
         String[] topics = topicsStr.split(",");
-        log.debug("Task {} subscribing to topics {}", id, topics);
+        log.debug("{} Task subscribing to topics {}", this, topics);
         consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
         task.initialize(context);
         task.start(taskConfig);
-        log.info("Sink task {} finished initialization and start", this);
+        log.info("{} Sink task finished initialization and start", this);
     }
 
     /** Poll for new messages with the given timeout. Should only be invoked 
by the worker thread. */
@@ -242,10 +240,10 @@ class WorkerSinkTask extends WorkerTask {
             context.timeout(-1L);
         }
 
-        log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
+        log.trace("{} polling consumer with timeout {} ms", this, timeoutMs);
         ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
         assert messageBatch.isEmpty() || msgs.isEmpty();
-        log.trace("{} polling returned {} messages", id, msgs.count());
+        log.trace("{} polling returned {} messages", this, msgs.count());
 
         convertMessages(msgs);
         deliverMessages();
@@ -305,12 +303,12 @@ class WorkerSinkTask extends WorkerTask {
             taskProvidedOffsets = task.preCommit(new 
HashMap<>(currentOffsets));
         } catch (Throwable t) {
             if (closing) {
-                log.warn("{} Offset commit failed during close");
+                log.warn("{} Offset commit failed during close", this);
                 onCommitCompleted(t, commitSeqno);
             } else {
                 log.error("{} Offset commit failed, rewinding to last 
committed offsets", this, t);
                 for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
lastCommittedOffsets.entrySet()) {
-                    log.debug("{} Rewinding topic partition {} to offset {}", 
id, entry.getKey(), entry.getValue().offset());
+                    log.debug("{} Rewinding topic partition {} to offset {}", 
this, entry.getKey(), entry.getValue().offset());
                     consumer.seek(entry.getKey(), entry.getValue().offset());
                 }
                 currentOffsets = new HashMap<>(lastCommittedOffsets);
@@ -334,13 +332,17 @@ class WorkerSinkTask extends WorkerTask {
             final TopicPartition partition = taskProvidedOffsetEntry.getKey();
             final OffsetAndMetadata taskProvidedOffset = 
taskProvidedOffsetEntry.getValue();
             if (commitableOffsets.containsKey(partition)) {
-                if (taskProvidedOffset.offset() <= 
currentOffsets.get(partition).offset()) {
+                long taskOffset = taskProvidedOffset.offset();
+                long currentOffset = currentOffsets.get(partition).offset();
+                if (taskOffset <= currentOffset) {
                     commitableOffsets.put(partition, taskProvidedOffset);
                 } else {
-                    log.warn("Ignoring invalid task provided offset {}/{} -- 
not yet consumed", partition, taskProvidedOffset);
+                    log.warn("{} Ignoring invalid task provided offset {}/{} 
-- not yet consumed, taskOffset={} currentOffset={}",
+                            this, partition, taskProvidedOffset, taskOffset, 
currentOffset);
                 }
             } else {
-                log.warn("Ignoring invalid task provided offset {}/{} -- 
partition not assigned", partition, taskProvidedOffset);
+                log.warn("{} Ignoring invalid task provided offset {}/{} -- 
partition not assigned, assignment={}",
+                        this, partition, taskProvidedOffset, 
consumer.assignment());
             }
         }
 
@@ -402,7 +404,7 @@ class WorkerSinkTask extends WorkerTask {
 
     private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
         for (ConsumerRecord<byte[], byte[]> msg : msgs) {
-            log.trace("Consuming message with key {}, value {}", msg.key(), 
msg.value());
+            log.trace("{} Consuming message with key {}, value {}", this, 
msg.key(), msg.value());
             SchemaAndValue keyAndSchema = 
keyConverter.toConnectData(msg.topic(), msg.key());
             SchemaAndValue valueAndSchema = 
valueConverter.toConnectData(msg.topic(), msg.value());
             SinkRecord record = new SinkRecord(msg.topic(), msg.partition(),
@@ -445,16 +447,16 @@ class WorkerSinkTask extends WorkerTask {
                 pausedForRedelivery = false;
             }
         } catch (RetriableException e) {
-            log.error("RetriableException from SinkTask {}:", id, e);
+            log.error("{} RetriableException from SinkTask:", this, e);
             // If we're retrying a previous batch, make sure we've paused all 
topic partitions so we don't get new data,
             // but will still be able to poll in order to handle 
user-requested timeouts, keep group membership, etc.
             pausedForRedelivery = true;
             pauseAll();
             // Let this exit normally, the batch will be reprocessed on the 
next loop.
         } catch (Throwable t) {
-            log.error("Task {} threw an uncaught and unrecoverable exception", 
id, t);
-            log.error("Task is being killed and will not recover until 
manually restarted");
-            throw new ConnectException("Exiting WorkerSinkTask due to 
unrecoverable exception.");
+            log.error("{} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not "
+                    + "recover until manually restarted.", this, t);
+            throw new ConnectException("Exiting WorkerSinkTask due to 
unrecoverable exception.", t);
         }
     }
 
@@ -467,12 +469,12 @@ class WorkerSinkTask extends WorkerTask {
             TopicPartition tp = entry.getKey();
             Long offset = entry.getValue();
             if (offset != null) {
-                log.trace("Rewind {} to offset {}.", tp, offset);
+                log.trace("{} Rewind {} to offset {}.", this, tp, offset);
                 consumer.seek(tp, offset);
                 lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
                 currentOffsets.put(tp, new OffsetAndMetadata(offset));
             } else {
-                log.warn("Cannot rewind {} to null offset.", tp);
+                log.warn("{} Cannot rewind {} to null offset.", this, tp);
             }
         }
         context.clearOffsets();
@@ -495,7 +497,7 @@ class WorkerSinkTask extends WorkerTask {
                 long pos = consumer.position(tp);
                 lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
                 currentOffsets.put(tp, new OffsetAndMetadata(pos));
-                log.debug("{} assigned topic partition {} with offset {}", id, 
tp, pos);
+                log.debug("{} assigned topic partition {} with offset {}", 
this, tp, pos);
             }
 
             // If we paused everything for redelivery (which is no longer 
relevant since we discarded the data), make

http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index e676d50..0f42186 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -113,7 +113,7 @@ class WorkerSourceTask extends WorkerTask {
         try {
             this.taskConfig = taskConfig.originalsStrings();
         } catch (Throwable t) {
-            log.error("Task {} failed initialization and will not be 
started.", t);
+            log.error("{} Task failed initialization and will not be 
started.", this, t);
             onFailure(t);
         }
     }
@@ -140,7 +140,7 @@ class WorkerSourceTask extends WorkerTask {
         try {
             task.initialize(new WorkerSourceTaskContext(offsetReader));
             task.start(taskConfig);
-            log.info("Source task {} finished initialization and start", this);
+            log.info("{} Source task finished initialization and start", this);
             synchronized (this) {
                 if (startedShutdownBeforeStartCompleted) {
                     task.stop();
@@ -159,12 +159,12 @@ class WorkerSourceTask extends WorkerTask {
                 }
 
                 if (toSend == null) {
-                    log.debug("Nothing to send to Kafka. Polling source for 
additional records");
+                    log.debug("{} Nothing to send to Kafka. Polling source for 
additional records", this);
                     toSend = task.poll();
                 }
                 if (toSend == null)
                     continue;
-                log.debug("About to send " + toSend.size() + " records to 
Kafka");
+                log.debug("{} About to send " + toSend.size() + " records to 
Kafka", this);
                 if (!sendRecords())
                     stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, 
TimeUnit.MILLISECONDS);
             }
@@ -198,7 +198,7 @@ class WorkerSourceTask extends WorkerTask {
             byte[] value = valueConverter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
             final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(record.topic(), record.kafkaPartition(),
                     ConnectUtils.checkAndConvertTimestamp(record.timestamp()), 
key, value);
-            log.trace("Appending record with key {}, value {}", record.key(), 
record.value());
+            log.trace("{} Appending record with key {}, value {}", this, 
record.key(), record.value());
             // We need this queued first since the callback could happen 
immediately (even synchronously in some cases).
             // Because of this we need to be careful about handling retries -- 
we always save the previously attempted
             // record as part of toSend and need to use a flag to track 
whether we should actually add it to the outstanding
@@ -227,10 +227,11 @@ class WorkerSourceTask extends WorkerTask {
                                     // timeouts, callbacks with exceptions 
should never be invoked in practice. If the
                                     // user overrode these settings, the best 
we can do is notify them of the failure via
                                     // logging.
-                                    log.error("{} failed to send record to {}: 
{}", id, topic, e);
-                                    log.debug("Failed record: {}", 
preTransformRecord);
+                                    log.error("{} failed to send record to {}: 
{}", this, topic, e);
+                                    log.debug("{} Failed record: {}", this, 
preTransformRecord);
                                 } else {
-                                    log.trace("Wrote record successfully: 
topic {} partition {} offset {}",
+                                    log.trace("{} Wrote record successfully: 
topic {} partition {} offset {}",
+                                            this,
                                             recordMetadata.topic(), 
recordMetadata.partition(),
                                             recordMetadata.offset());
                                     commitTaskRecord(preTransformRecord);
@@ -240,7 +241,7 @@ class WorkerSourceTask extends WorkerTask {
                         });
                 lastSendFailed = false;
             } catch (RetriableException e) {
-                log.warn("Failed to send {}, backing off before retrying:", 
producerRecord, e);
+                log.warn("{} Failed to send {}, backing off before retrying:", 
this, producerRecord, e);
                 toSend = toSend.subList(processed, toSend.size());
                 lastSendFailed = true;
                 return false;
@@ -256,10 +257,8 @@ class WorkerSourceTask extends WorkerTask {
     private void commitTaskRecord(SourceRecord record) {
         try {
             task.commitRecord(record);
-        } catch (InterruptedException e) {
-            log.error("Exception thrown", e);
         } catch (Throwable t) {
-            log.error("Exception thrown while calling task.commitRecord()", t);
+            log.error("{} Exception thrown while calling task.commitRecord()", 
this, t);
         }
     }
 
@@ -270,8 +269,7 @@ class WorkerSourceTask extends WorkerTask {
             removed = outstandingMessagesBacklog.remove(record);
         // But if neither one had it, something is very wrong
         if (removed == null) {
-            log.error("CRITICAL Saw callback for record that was not present 
in the outstanding message set: "
-                    + "{}", record);
+            log.error("{} CRITICAL Saw callback for record that was not 
present in the outstanding message set: {}", this, record);
         } else if (flushing && outstandingMessages.isEmpty()) {
             // flush thread may be waiting on the outstanding messages to clear
             this.notifyAll();
@@ -303,7 +301,7 @@ class WorkerSourceTask extends WorkerTask {
                 try {
                     long timeoutMs = timeout - time.milliseconds();
                     if (timeoutMs <= 0) {
-                        log.error("Failed to flush {}, timed out while waiting 
for producer to flush outstanding {} messages", this, 
outstandingMessages.size());
+                        log.error("{} Failed to flush, timed out while waiting 
for producer to flush outstanding {} messages", this, 
outstandingMessages.size());
                         finishFailedFlush();
                         return false;
                     }
@@ -324,7 +322,7 @@ class WorkerSourceTask extends WorkerTask {
                 // flush time, which can be used for monitoring even if the 
connector doesn't record any
                 // offsets.
                 finishSuccessfulFlush();
-                log.debug("Finished {} offset commitOffsets successfully in {} 
ms",
+                log.debug("{} Finished offset commitOffsets successfully in {} 
ms",
                         this, time.milliseconds() - started);
 
                 commitSourceTask();
@@ -337,9 +335,9 @@ class WorkerSourceTask extends WorkerTask {
             @Override
             public void onCompletion(Throwable error, Void result) {
                 if (error != null) {
-                    log.error("Failed to flush {} offsets to storage: ", this, 
error);
+                    log.error("{} Failed to flush offsets to storage: ", this, 
error);
                 } else {
-                    log.trace("Finished flushing {} offsets to storage", this);
+                    log.trace("{} Finished flushing offsets to storage", this);
                 }
             }
         });
@@ -356,21 +354,21 @@ class WorkerSourceTask extends WorkerTask {
             // errors, is only wasteful in this minor edge case, and the worst 
result is that the log
             // could look a little confusing.
         } catch (InterruptedException e) {
-            log.warn("Flush of {} offsets interrupted, cancelling", this);
+            log.warn("{} Flush of offsets interrupted, cancelling", this);
             finishFailedFlush();
             return false;
         } catch (ExecutionException e) {
-            log.error("Flush of {} offsets threw an unexpected exception: ", 
this, e);
+            log.error("{} Flush of offsets threw an unexpected exception: ", 
this, e);
             finishFailedFlush();
             return false;
         } catch (TimeoutException e) {
-            log.error("Timed out waiting to flush {} offsets to storage", 
this);
+            log.error("{} Timed out waiting to flush offsets to storage", 
this);
             finishFailedFlush();
             return false;
         }
 
         finishSuccessfulFlush();
-        log.info("Finished {} commitOffsets successfully in {} ms",
+        log.info("{} Finished commitOffsets successfully in {} ms",
                 this, time.milliseconds() - started);
 
         commitSourceTask();
@@ -381,10 +379,8 @@ class WorkerSourceTask extends WorkerTask {
     private void commitSourceTask() {
         try {
             this.task.commit();
-        } catch (InterruptedException ex) {
-            log.warn("Commit interrupted", ex);
         } catch (Throwable t) {
-            log.error("Exception thrown while calling task.commit()", t);
+            log.error("{} Exception thrown while calling task.commit()", this, 
t);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 9b233dd..3295434 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -124,7 +124,7 @@ abstract class WorkerTask implements Runnable {
         try {
             close();
         } catch (Throwable t) {
-            log.error("Task {} threw an uncaught and unrecoverable exception 
during shutdown", id, t);
+            log.error("{} Task threw an uncaught and unrecoverable exception 
during shutdown", this, t);
             throw t;
         }
     }
@@ -145,8 +145,8 @@ abstract class WorkerTask implements Runnable {
 
             execute();
         } catch (Throwable t) {
-            log.error("Task {} threw an uncaught and unrecoverable exception", 
id, t);
-            log.error("Task is being killed and will not recover until 
manually restarted");
+            log.error("{} Task threw an uncaught and unrecoverable exception", 
this, t);
+            log.error("{} Task is being killed and will not recover until 
manually restarted", this);
             throw t;
         } finally {
             doClose();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index eae3726..50b7503 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -461,6 +461,8 @@ public class WorkerSinkTaskTest {
 
         sinkTask.preCommit(workerCurrentOffsets);
         EasyMock.expectLastCall().andReturn(taskOffsets);
+        // Expect extra invalid topic partition to be filtered, which causes 
the consumer assignment to be logged
+        
EasyMock.expect(consumer.assignment()).andReturn(workerCurrentOffsets.keySet());
         final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
         consumer.commitAsync(EasyMock.eq(committableOffsets), 
EasyMock.capture(callback));
         EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {

Reply via email to