C0urante commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443756424



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##########
@@ -56,7 +56,7 @@
     private static final String THREAD_NAME_PREFIX = "task-thread-";
 
     protected final ConnectorTaskId id;
-    private final TaskStatus.Listener statusListener;
+    protected final TaskStatus.Listener statusListener;

Review comment:
       Yeah, we definitely don't want to run connector or task code on the same 
thread as the `Worker` (or really, the `Herder` that's calling the `Worker`). I 
don't think it's that bad if the startup and recording of metrics happens 
asynchronously from the call to `Worker::startConnector` or 
`Worker::startTask`; the only potential downside I can think of is that someone 
might believe they've started a connector/task but the startup metrics for the 
worker might not yet be incremented if the connector/task is taking a while (or 
just completely hung) during startup.
   
   I was just suggesting a small refactoring though, not anything that would 
affect the actual behavior of the framework. Just something to make the code a 
little cleaner.
   
   In case it helps, I was thinking `WorkerTask` might look like this:
   
   ```java
   abstract class WorkerTask {
       private final TaskStatus.Listener statusListener;
   
       protected abstract void initializeAndStart();
   
       protected abstract void execute();
   
           private void doRun() throws InterruptedException {
           try {
               synchronized (this) {
                   if (stopping)
                       return;
   
                   if (targetState == TargetState.PAUSED) {
                       onPause();
                       if (!awaitUnpause()) return;
                   }
               }
   
               // These three lines replace the single call to execute() that's 
in the WorkerTask class right now
               initializeAndStart();
               statusListener.onStartup();
               execute();
           } catch (Throwable t) {
               log.error("{} Task threw an uncaught and unrecoverable 
exception", this, t);
               log.error("{} Task is being killed and will not recover until 
manually restarted", this);
               throw t;
           } finally {
               doClose();
           }
       }
   }
   
   ```
   
   `WorkerSinkTask` might look like this:
   
   ```java
   class WorkerSinkTask extends WorkerTask {
   
       // This is already a method in the WorkerSinkTask class, but now it 
overrides an abstract method in the WorkerTask superclass
       @Override
       protected void initializeAndStart() {
           SinkConnectorConfig.validate(taskConfig);
   
           if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
               List<String> topics = 
SinkConnectorConfig.parseTopicsList(taskConfig);
               consumer.subscribe(topics, new HandleRebalance());
               log.debug("{} Initializing and starting task for topics {}", 
this, Utils.join(topics, ", "));
           } else {
               String topicsRegexStr = 
taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
               Pattern pattern = Pattern.compile(topicsRegexStr);
               consumer.subscribe(pattern, new HandleRebalance());
               log.debug("{} Initializing and starting task for topics regex 
{}", this, topicsRegexStr);
           }
   
           task.initialize(context);
           task.start(taskConfig);
           log.info("{} Sink task finished initialization and start", this);
       }
   
       // Remove the call to initializeAndStart() and 
statusListener.onStartup() here; they'll be called automatically by the 
superclass
       @Override
       public void execute() {
           // Make sure any uncommitted data has been committed and the task has
           // a chance to clean up its state
           try (UncheckedCloseable suppressible = this::closePartitions) {
               while (!isStopping())
                   iteration();
           }
       }
   }
   ```
   
   And `WorkerSourceTask` might look like this:
   
   ```java
   class WorkerSourceTask extends WorkerTask {
   
       // Technically a new method, but all code has just been cut+pasted from 
the existing execute() method
       @Override
       protected void initializeAndStart() {
           task.initialize(new WorkerSourceTaskContext(offsetReader, this, 
configState));
           task.start(taskConfig);
           log.info("{} Source task finished initialization and start", this);
       }
    
       // Same as the existing execute() method, except for the code removed 
for initializeAndStart() and the call to statusListener.onStartup
       @Override
       public void execute() {
           synchronized (this) {
               if (startedShutdownBeforeStartCompleted) {
                   tryStop();
                   return;
               }
               finishedStart = true;
           }
   
           try {
               while (!isStopping()) {
                   if (shouldPause()) {
                       onPause();
                       if (awaitUnpause()) {
                           onResume();
                       }
                       continue;
                   }
   
                   maybeThrowProducerSendException();
   
                   if (toSend == null) {
                       log.trace("{} Nothing to send to Kafka. Polling source 
for additional records", this);
                       long start = time.milliseconds();
                       toSend = poll();
                       if (toSend != null) {
                           recordPollReturned(toSend.size(), 
time.milliseconds() - start);
                       }
                   }
                   if (toSend == null)
                       continue;
                   log.trace("{} About to send {} records to Kafka", this, 
toSend.size());
                   if (!sendRecords())
                       stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, 
TimeUnit.MILLISECONDS);
               }
           } catch (InterruptedException e) {
               // Ignore and allow to exit.
           } finally {
               // It should still be safe to commit offsets since any exception 
would have
               // simply resulted in not getting more records but all the 
existing records should be ok to flush
               // and commit offsets. Worst case, task.flush() will also throw 
an exception causing the offset commit
               // to fail.
               commitOffsets();
           }
       }
   }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -562,7 +556,6 @@ public boolean startTask(
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
                 connectorStatusMetricsGroup.recordTaskRemoved(id);
-                workerMetricsGroup.recordTaskFailure();

Review comment:
       Sounds good!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to