C0urante commented on a change in pull request #8844: URL: https://github.com/apache/kafka/pull/8844#discussion_r443756424
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ########## @@ -56,7 +56,7 @@ private static final String THREAD_NAME_PREFIX = "task-thread-"; protected final ConnectorTaskId id; - private final TaskStatus.Listener statusListener; + protected final TaskStatus.Listener statusListener; Review comment: Yeah, we definitely don't want to run connector or task code on the same thread as the `Worker` (or really, the `Herder` that's calling the `Worker`). I don't think it's that bad if the startup and recording of metrics happens asynchronously from the call to `Worker::startConnector` or `Worker::startTask`; the only potential downside I can think of is that someone might believe they've started a connector/task but the startup metrics for the worker might not yet be incremented if the connector/task is taking a while (or just completely hung) during startup. I was just suggesting a small refactoring though, not anything that would affect the actual behavior of the framework. Just something to make the code a little cleaner. In case it helps, I was thinking `WorkerTask` might look like this: ```java abstract class WorkerTask { private final TaskStatus.Listener statusListener; protected abstract void initializeAndStart(); protected abstract void execute(); private void doRun() throws InterruptedException { try { synchronized (this) { if (stopping) return; if (targetState == TargetState.PAUSED) { onPause(); if (!awaitUnpause()) return; } } // These three lines replace the single call to execute() that's in the WorkerTask class right now initializeAndStart(); statusListener.onStartup(); execute(); } catch (Throwable t) { log.error("{} Task threw an uncaught and unrecoverable exception", this, t); log.error("{} Task is being killed and will not recover until manually restarted", this); throw t; } finally { doClose(); } } } ``` `WorkerSinkTask` might look like this: ```java class WorkerSinkTask extends WorkerTask { // This is already a method in the WorkerSinkTask class, but now it overrides an abstract method in the WorkerTask superclass @Override protected void initializeAndStart() { SinkConnectorConfig.validate(taskConfig); if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) { List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig); consumer.subscribe(topics, new HandleRebalance()); log.debug("{} Initializing and starting task for topics {}", this, Utils.join(topics, ", ")); } else { String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG); Pattern pattern = Pattern.compile(topicsRegexStr); consumer.subscribe(pattern, new HandleRebalance()); log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr); } task.initialize(context); task.start(taskConfig); log.info("{} Sink task finished initialization and start", this); } // Remove the call to initializeAndStart() and statusListener.onStartup() here; they'll be called automatically by the superclass @Override public void execute() { // Make sure any uncommitted data has been committed and the task has // a chance to clean up its state try (UncheckedCloseable suppressible = this::closePartitions) { while (!isStopping()) iteration(); } } } ``` And `WorkerSourceTask` might look like this: ```java class WorkerSourceTask extends WorkerTask { // Technically a new method, but all code has just been cut+pasted from the existing execute() method @Override protected void initializeAndStart() { task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState)); task.start(taskConfig); log.info("{} Source task finished initialization and start", this); } // Same as the existing execute() method, except for the code removed for initializeAndStart() and the call to statusListener.onStartup @Override public void execute() { synchronized (this) { if (startedShutdownBeforeStartCompleted) { tryStop(); return; } finishedStart = true; } try { while (!isStopping()) { if (shouldPause()) { onPause(); if (awaitUnpause()) { onResume(); } continue; } maybeThrowProducerSendException(); if (toSend == null) { log.trace("{} Nothing to send to Kafka. Polling source for additional records", this); long start = time.milliseconds(); toSend = poll(); if (toSend != null) { recordPollReturned(toSend.size(), time.milliseconds() - start); } } if (toSend == null) continue; log.trace("{} About to send {} records to Kafka", this, toSend.size()); if (!sendRecords()) stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { // Ignore and allow to exit. } finally { // It should still be safe to commit offsets since any exception would have // simply resulted in not getting more records but all the existing records should be ok to flush // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit // to fail. commitOffsets(); } } } ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -562,7 +556,6 @@ public boolean startTask( // statusListener Plugins.compareAndSwapLoaders(savedLoader); connectorStatusMetricsGroup.recordTaskRemoved(id); - workerMetricsGroup.recordTaskFailure(); Review comment: Sounds good! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org