C0urante commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r760307980



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new 
ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> 
errorHandlingMetricsMap = new ConcurrentHashMap<>();

Review comment:
       Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object 
from within the `WorkerTask` class, but rather, accepting it as a constructor 
parameter so that it can be closed during `removeMetrics`. Something like:
   
   ```java
   abstract class WorkerTask implements Runnable {
   
       private final ErrorHandlingMetrics errorMetrics; // NEW
   
       public WorkerTask(ConnectorTaskId id,
                         TaskStatus.Listener statusListener,
                         TargetState initialState,
                         ClassLoader loader,
                         ConnectMetrics connectMetrics,
                         ErrorHandlingMetrics errorMetrics, // NEW
                         RetryWithToleranceOperator retryWithToleranceOperator,
                         Time time,
                         StatusBackingStore statusBackingStore) {
           this.id = id;
           this.taskMetricsGroup = new TaskMetricsGroup(this.id, 
connectMetrics, statusListener);
           this.errorMetrics = errorMetrics; // NEW
           this.statusListener = taskMetricsGroup;
           this.loader = loader;
           this.targetState = initialState;
           this.stopping = false;
           this.cancelled = false;
           this.taskMetricsGroup.recordState(this.targetState);
           this.retryWithToleranceOperator = retryWithToleranceOperator;
           this.time = time;
           this.statusBackingStore = statusBackingStore;
       }
   
       public void removeMetrics() {
           // Close quietly here so that we can be sure to close everything 
even if one attempt fails
           Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group");
           Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW
       }
   }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new 
ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> 
errorHandlingMetricsMap = new ConcurrentHashMap<>();

Review comment:
       Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object 
from within the `WorkerTask` class, but rather, accepting it as a constructor 
parameter so that it can be closed during `removeMetrics`. Something like:
   
   ```java
   abstract class WorkerTask implements Runnable {
   
       private final ErrorHandlingMetrics errorMetrics; // NEW
   
       public WorkerTask(ConnectorTaskId id,
                         TaskStatus.Listener statusListener,
                         TargetState initialState,
                         ClassLoader loader,
                         ConnectMetrics connectMetrics,
                         ErrorHandlingMetrics errorMetrics, // NEW
                         RetryWithToleranceOperator retryWithToleranceOperator,
                         Time time,
                         StatusBackingStore statusBackingStore) {
           this.id = id;
           this.taskMetricsGroup = new TaskMetricsGroup(this.id, 
connectMetrics, statusListener);
           this.errorMetrics = errorMetrics; // NEW
           this.statusListener = taskMetricsGroup;
           this.loader = loader;
           this.targetState = initialState;
           this.stopping = false;
           this.cancelled = false;
           this.taskMetricsGroup.recordState(this.targetState);
           this.retryWithToleranceOperator = retryWithToleranceOperator;
           this.time = time;
           this.statusBackingStore = statusBackingStore;
       }
   
       public void removeMetrics() {
           // Close quietly here so that we can be sure to close everything 
even if one attempt fails
           Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group");
           Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW
       }
   }
   ```
   
   How does that look?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to