Repository: kafka Updated Branches: refs/heads/trunk 1a653c813 -> a593db6a2
MINOR: Standardize logging of Worker-level messages from Tasks and Connectors This ensures all logs have the connector/task ID, whether tasks are source or sink, and formats them consistently. Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Randall Hauch <rha...@gmail.com>, Konstantine Karantasis <konstant...@confluent.io>, Jason Gustafson <ja...@confluent.io> Closes #3639 from ewencp/standardize-connector-task-logging Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a593db6a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a593db6a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a593db6a Branch: refs/heads/trunk Commit: a593db6a2b6c3511215008b6d3dd6bd77f84e8c4 Parents: 1a653c8 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Wed Aug 9 09:07:27 2017 -0700 Committer: Ewen Cheslack-Postava <m...@ewencp.org> Committed: Wed Aug 9 09:07:27 2017 -0700 ---------------------------------------------------------------------- .../runtime/SourceTaskOffsetCommitter.java | 6 +-- .../kafka/connect/runtime/WorkerConnector.java | 19 ++++--- .../kafka/connect/runtime/WorkerSinkTask.java | 54 ++++++++++---------- .../kafka/connect/runtime/WorkerSourceTask.java | 46 ++++++++--------- .../kafka/connect/runtime/WorkerTask.java | 6 +-- .../connect/runtime/WorkerSinkTaskTest.java | 2 + 6 files changed, 66 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java index f93d7d2..c502809 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java @@ -103,17 +103,17 @@ class SourceTaskOffsetCommitter { } private void commit(WorkerSourceTask workerTask) { - log.debug("Committing offsets for {}", workerTask); + log.debug("{} Committing offsets", workerTask); try { if (workerTask.commitOffsets()) { return; } - log.error("Failed to commit offsets for {}", workerTask); + log.error("{} Failed to commit offsets", workerTask); } catch (Throwable t) { // We're very careful about exceptions here since any uncaught exceptions in the commit // thread would cause the fixed interval schedule on the ExecutorService to stop running // for that task - log.error("Unhandled exception when committing {}: ", workerTask, t); + log.error("{} Unhandled exception when committing: ", workerTask, t); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 1937429..983db92 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -68,7 +68,7 @@ public class WorkerConnector { public void initialize(ConnectorConfig connectorConfig) { try { this.config = connectorConfig.originalsStrings(); - log.debug("Initializing connector {} with config {}", connName, config); + log.debug("{} Initializing connector {} with config {}", this, connName, config); connector.initialize(new ConnectorContext() { @Override @@ -78,13 +78,13 @@ public class WorkerConnector { @Override public void raiseError(Exception e) { - log.error("Connector raised an error {}", connName, e); + log.error("{} Connector raised an error", this, e); onFailure(e); ctx.raiseError(e); } }); } catch (Throwable t) { - log.error("Error initializing connector {}", connName, t); + log.error("{} Error initializing connector", this, t); onFailure(t); } } @@ -105,7 +105,7 @@ public class WorkerConnector { throw new IllegalArgumentException("Cannot start connector in state " + state); } } catch (Throwable t) { - log.error("Error while starting connector {}", connName, t); + log.error("{} Error while starting connector", this, t); onFailure(t); return false; } @@ -149,7 +149,7 @@ public class WorkerConnector { throw new IllegalArgumentException("Cannot pause connector in state " + state); } } catch (Throwable t) { - log.error("Error while shutting down connector {}", connName, t); + log.error("{} Error while shutting down connector", this, t); statusListener.onFailure(connName, t); this.state = State.FAILED; } @@ -161,7 +161,7 @@ public class WorkerConnector { connector.stop(); this.state = State.STOPPED; } catch (Throwable t) { - log.error("Error while shutting down connector {}", connName, t); + log.error("{} Error while shutting down connector", this, t); this.state = State.FAILED; } finally { statusListener.onShutdown(connName); @@ -170,11 +170,11 @@ public class WorkerConnector { public void transitionTo(TargetState targetState) { if (state == State.FAILED) { - log.warn("Cannot transition connector {} to {} since it has failed", connName, targetState); + log.warn("{} Cannot transition connector to {} since it has failed", this, targetState); return; } - log.debug("Transition connector {} to {}", connName, targetState); + log.debug("{} Transition connector to {}", this, targetState); if (targetState == TargetState.PAUSED) { pause(); } else if (targetState == TargetState.STARTED) { @@ -198,8 +198,7 @@ public class WorkerConnector { @Override public String toString() { return "WorkerConnector{" + - "connName='" + connName + '\'' + - ", connector=" + connector + + "id=" + connName + '}'; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index c4567a3..9295712 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -112,7 +112,7 @@ class WorkerSinkTask extends WorkerTask { this.consumer = createConsumer(); this.context = new WorkerSinkTaskContext(consumer); } catch (Throwable t) { - log.error("Task {} failed initialization and will not be started.", t); + log.error("{} Task failed initialization and will not be started.", this, t); onFailure(t); } } @@ -170,7 +170,7 @@ class WorkerSinkTask extends WorkerTask { // Check for timed out commits if (committing && now >= commitTimeoutMs) { - log.warn("Commit of {} offsets timed out", this); + log.warn("{} Commit of offsets timed out", this); commitFailures++; committing = false; } @@ -179,7 +179,7 @@ class WorkerSinkTask extends WorkerTask { long timeoutMs = Math.max(nextCommit - now, 0); poll(timeoutMs); } catch (WakeupException we) { - log.trace("{} consumer woken up", id); + log.trace("{} consumer woken up", this); if (isStopping()) return; @@ -197,15 +197,13 @@ class WorkerSinkTask extends WorkerTask { private void onCommitCompleted(Throwable error, long seqno) { if (commitSeqno != seqno) { - log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", - this, - seqno, commitSeqno); + log.debug("{} Got callback for timed out commit: {}, but most recent commit is {}", this, seqno, commitSeqno); } else { if (error != null) { - log.error("Commit of {} offsets threw an unexpected exception: ", this, error); + log.error("{} Commit of offsets threw an unexpected exception: ", this, error); commitFailures++; } else { - log.debug("Finished {} offset commit successfully in {} ms", + log.debug("{} Finished offset commit successfully in {} ms", this, time.milliseconds() - commitStarted); commitFailures = 0; } @@ -221,16 +219,16 @@ class WorkerSinkTask extends WorkerTask { * Initializes and starts the SinkTask. */ protected void initializeAndStart() { - log.debug("Initializing task {} ", id); + log.debug("{} Initializing task", this); String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG); if (topicsStr == null || topicsStr.isEmpty()) throw new ConnectException("Sink tasks require a list of topics."); String[] topics = topicsStr.split(","); - log.debug("Task {} subscribing to topics {}", id, topics); + log.debug("{} Task subscribing to topics {}", this, topics); consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); task.initialize(context); task.start(taskConfig); - log.info("Sink task {} finished initialization and start", this); + log.info("{} Sink task finished initialization and start", this); } /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ @@ -242,10 +240,10 @@ class WorkerSinkTask extends WorkerTask { context.timeout(-1L); } - log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); + log.trace("{} polling consumer with timeout {} ms", this, timeoutMs); ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs); assert messageBatch.isEmpty() || msgs.isEmpty(); - log.trace("{} polling returned {} messages", id, msgs.count()); + log.trace("{} polling returned {} messages", this, msgs.count()); convertMessages(msgs); deliverMessages(); @@ -305,12 +303,12 @@ class WorkerSinkTask extends WorkerTask { taskProvidedOffsets = task.preCommit(new HashMap<>(currentOffsets)); } catch (Throwable t) { if (closing) { - log.warn("{} Offset commit failed during close"); + log.warn("{} Offset commit failed during close", this); onCommitCompleted(t, commitSeqno); } else { log.error("{} Offset commit failed, rewinding to last committed offsets", this, t); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) { - log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); + log.debug("{} Rewinding topic partition {} to offset {}", this, entry.getKey(), entry.getValue().offset()); consumer.seek(entry.getKey(), entry.getValue().offset()); } currentOffsets = new HashMap<>(lastCommittedOffsets); @@ -334,13 +332,17 @@ class WorkerSinkTask extends WorkerTask { final TopicPartition partition = taskProvidedOffsetEntry.getKey(); final OffsetAndMetadata taskProvidedOffset = taskProvidedOffsetEntry.getValue(); if (commitableOffsets.containsKey(partition)) { - if (taskProvidedOffset.offset() <= currentOffsets.get(partition).offset()) { + long taskOffset = taskProvidedOffset.offset(); + long currentOffset = currentOffsets.get(partition).offset(); + if (taskOffset <= currentOffset) { commitableOffsets.put(partition, taskProvidedOffset); } else { - log.warn("Ignoring invalid task provided offset {}/{} -- not yet consumed", partition, taskProvidedOffset); + log.warn("{} Ignoring invalid task provided offset {}/{} -- not yet consumed, taskOffset={} currentOffset={}", + this, partition, taskProvidedOffset, taskOffset, currentOffset); } } else { - log.warn("Ignoring invalid task provided offset {}/{} -- partition not assigned", partition, taskProvidedOffset); + log.warn("{} Ignoring invalid task provided offset {}/{} -- partition not assigned, assignment={}", + this, partition, taskProvidedOffset, consumer.assignment()); } } @@ -402,7 +404,7 @@ class WorkerSinkTask extends WorkerTask { private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) { for (ConsumerRecord<byte[], byte[]> msg : msgs) { - log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); + log.trace("{} Consuming message with key {}, value {}", this, msg.key(), msg.value()); SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key()); SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value()); SinkRecord record = new SinkRecord(msg.topic(), msg.partition(), @@ -445,16 +447,16 @@ class WorkerSinkTask extends WorkerTask { pausedForRedelivery = false; } } catch (RetriableException e) { - log.error("RetriableException from SinkTask {}:", id, e); + log.error("{} RetriableException from SinkTask:", this, e); // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data, // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc. pausedForRedelivery = true; pauseAll(); // Let this exit normally, the batch will be reprocessed on the next loop. } catch (Throwable t) { - log.error("Task {} threw an uncaught and unrecoverable exception", id, t); - log.error("Task is being killed and will not recover until manually restarted"); - throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception."); + log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not " + + "recover until manually restarted.", this, t); + throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.", t); } } @@ -467,12 +469,12 @@ class WorkerSinkTask extends WorkerTask { TopicPartition tp = entry.getKey(); Long offset = entry.getValue(); if (offset != null) { - log.trace("Rewind {} to offset {}.", tp, offset); + log.trace("{} Rewind {} to offset {}.", this, tp, offset); consumer.seek(tp, offset); lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset)); currentOffsets.put(tp, new OffsetAndMetadata(offset)); } else { - log.warn("Cannot rewind {} to null offset.", tp); + log.warn("{} Cannot rewind {} to null offset.", this, tp); } } context.clearOffsets(); @@ -495,7 +497,7 @@ class WorkerSinkTask extends WorkerTask { long pos = consumer.position(tp); lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); currentOffsets.put(tp, new OffsetAndMetadata(pos)); - log.debug("{} assigned topic partition {} with offset {}", id, tp, pos); + log.debug("{} assigned topic partition {} with offset {}", this, tp, pos); } // If we paused everything for redelivery (which is no longer relevant since we discarded the data), make http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index e676d50..0f42186 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -113,7 +113,7 @@ class WorkerSourceTask extends WorkerTask { try { this.taskConfig = taskConfig.originalsStrings(); } catch (Throwable t) { - log.error("Task {} failed initialization and will not be started.", t); + log.error("{} Task failed initialization and will not be started.", this, t); onFailure(t); } } @@ -140,7 +140,7 @@ class WorkerSourceTask extends WorkerTask { try { task.initialize(new WorkerSourceTaskContext(offsetReader)); task.start(taskConfig); - log.info("Source task {} finished initialization and start", this); + log.info("{} Source task finished initialization and start", this); synchronized (this) { if (startedShutdownBeforeStartCompleted) { task.stop(); @@ -159,12 +159,12 @@ class WorkerSourceTask extends WorkerTask { } if (toSend == null) { - log.debug("Nothing to send to Kafka. Polling source for additional records"); + log.debug("{} Nothing to send to Kafka. Polling source for additional records", this); toSend = task.poll(); } if (toSend == null) continue; - log.debug("About to send " + toSend.size() + " records to Kafka"); + log.debug("{} About to send " + toSend.size() + " records to Kafka", this); if (!sendRecords()) stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS); } @@ -198,7 +198,7 @@ class WorkerSourceTask extends WorkerTask { byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value); - log.trace("Appending record with key {}, value {}", record.key(), record.value()); + log.trace("{} Appending record with key {}, value {}", this, record.key(), record.value()); // We need this queued first since the callback could happen immediately (even synchronously in some cases). // Because of this we need to be careful about handling retries -- we always save the previously attempted // record as part of toSend and need to use a flag to track whether we should actually add it to the outstanding @@ -227,10 +227,11 @@ class WorkerSourceTask extends WorkerTask { // timeouts, callbacks with exceptions should never be invoked in practice. If the // user overrode these settings, the best we can do is notify them of the failure via // logging. - log.error("{} failed to send record to {}: {}", id, topic, e); - log.debug("Failed record: {}", preTransformRecord); + log.error("{} failed to send record to {}: {}", this, topic, e); + log.debug("{} Failed record: {}", this, preTransformRecord); } else { - log.trace("Wrote record successfully: topic {} partition {} offset {}", + log.trace("{} Wrote record successfully: topic {} partition {} offset {}", + this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitTaskRecord(preTransformRecord); @@ -240,7 +241,7 @@ class WorkerSourceTask extends WorkerTask { }); lastSendFailed = false; } catch (RetriableException e) { - log.warn("Failed to send {}, backing off before retrying:", producerRecord, e); + log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e); toSend = toSend.subList(processed, toSend.size()); lastSendFailed = true; return false; @@ -256,10 +257,8 @@ class WorkerSourceTask extends WorkerTask { private void commitTaskRecord(SourceRecord record) { try { task.commitRecord(record); - } catch (InterruptedException e) { - log.error("Exception thrown", e); } catch (Throwable t) { - log.error("Exception thrown while calling task.commitRecord()", t); + log.error("{} Exception thrown while calling task.commitRecord()", this, t); } } @@ -270,8 +269,7 @@ class WorkerSourceTask extends WorkerTask { removed = outstandingMessagesBacklog.remove(record); // But if neither one had it, something is very wrong if (removed == null) { - log.error("CRITICAL Saw callback for record that was not present in the outstanding message set: " - + "{}", record); + log.error("{} CRITICAL Saw callback for record that was not present in the outstanding message set: {}", this, record); } else if (flushing && outstandingMessages.isEmpty()) { // flush thread may be waiting on the outstanding messages to clear this.notifyAll(); @@ -303,7 +301,7 @@ class WorkerSourceTask extends WorkerTask { try { long timeoutMs = timeout - time.milliseconds(); if (timeoutMs <= 0) { - log.error("Failed to flush {}, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size()); + log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size()); finishFailedFlush(); return false; } @@ -324,7 +322,7 @@ class WorkerSourceTask extends WorkerTask { // flush time, which can be used for monitoring even if the connector doesn't record any // offsets. finishSuccessfulFlush(); - log.debug("Finished {} offset commitOffsets successfully in {} ms", + log.debug("{} Finished offset commitOffsets successfully in {} ms", this, time.milliseconds() - started); commitSourceTask(); @@ -337,9 +335,9 @@ class WorkerSourceTask extends WorkerTask { @Override public void onCompletion(Throwable error, Void result) { if (error != null) { - log.error("Failed to flush {} offsets to storage: ", this, error); + log.error("{} Failed to flush offsets to storage: ", this, error); } else { - log.trace("Finished flushing {} offsets to storage", this); + log.trace("{} Finished flushing offsets to storage", this); } } }); @@ -356,21 +354,21 @@ class WorkerSourceTask extends WorkerTask { // errors, is only wasteful in this minor edge case, and the worst result is that the log // could look a little confusing. } catch (InterruptedException e) { - log.warn("Flush of {} offsets interrupted, cancelling", this); + log.warn("{} Flush of offsets interrupted, cancelling", this); finishFailedFlush(); return false; } catch (ExecutionException e) { - log.error("Flush of {} offsets threw an unexpected exception: ", this, e); + log.error("{} Flush of offsets threw an unexpected exception: ", this, e); finishFailedFlush(); return false; } catch (TimeoutException e) { - log.error("Timed out waiting to flush {} offsets to storage", this); + log.error("{} Timed out waiting to flush offsets to storage", this); finishFailedFlush(); return false; } finishSuccessfulFlush(); - log.info("Finished {} commitOffsets successfully in {} ms", + log.info("{} Finished commitOffsets successfully in {} ms", this, time.milliseconds() - started); commitSourceTask(); @@ -381,10 +379,8 @@ class WorkerSourceTask extends WorkerTask { private void commitSourceTask() { try { this.task.commit(); - } catch (InterruptedException ex) { - log.warn("Commit interrupted", ex); } catch (Throwable t) { - log.error("Exception thrown while calling task.commit()", t); + log.error("{} Exception thrown while calling task.commit()", this, t); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 9b233dd..3295434 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -124,7 +124,7 @@ abstract class WorkerTask implements Runnable { try { close(); } catch (Throwable t) { - log.error("Task {} threw an uncaught and unrecoverable exception during shutdown", id, t); + log.error("{} Task threw an uncaught and unrecoverable exception during shutdown", this, t); throw t; } } @@ -145,8 +145,8 @@ abstract class WorkerTask implements Runnable { execute(); } catch (Throwable t) { - log.error("Task {} threw an uncaught and unrecoverable exception", id, t); - log.error("Task is being killed and will not recover until manually restarted"); + log.error("{} Task threw an uncaught and unrecoverable exception", this, t); + log.error("{} Task is being killed and will not recover until manually restarted", this); throw t; } finally { doClose(); http://git-wip-us.apache.org/repos/asf/kafka/blob/a593db6a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index eae3726..50b7503 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -461,6 +461,8 @@ public class WorkerSinkTaskTest { sinkTask.preCommit(workerCurrentOffsets); EasyMock.expectLastCall().andReturn(taskOffsets); + // Expect extra invalid topic partition to be filtered, which causes the consumer assignment to be logged + EasyMock.expect(consumer.assignment()).andReturn(workerCurrentOffsets.keySet()); final Capture<OffsetCommitCallback> callback = EasyMock.newCapture(); consumer.commitAsync(EasyMock.eq(committableOffsets), EasyMock.capture(callback)); EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {