C0urante commented on a change in pull request #10910: URL: https://github.com/apache/kafka/pull/10910#discussion_r760307980
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -113,6 +113,7 @@ private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>(); private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>(); + private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>(); Review comment: Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object from within the `WorkerTask` class, but rather, accepting it as a constructor parameter so that it can be closed during `removeMetrics`. Something like: ```java abstract class WorkerTask implements Runnable { private final ErrorHandlingMetrics errorMetrics; // NEW public WorkerTask(ConnectorTaskId id, TaskStatus.Listener statusListener, TargetState initialState, ClassLoader loader, ConnectMetrics connectMetrics, ErrorHandlingMetrics errorMetrics, // NEW RetryWithToleranceOperator retryWithToleranceOperator, Time time, StatusBackingStore statusBackingStore) { this.id = id; this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener); this.errorMetrics = errorMetrics; // NEW this.statusListener = taskMetricsGroup; this.loader = loader; this.targetState = initialState; this.stopping = false; this.cancelled = false; this.taskMetricsGroup.recordState(this.targetState); this.retryWithToleranceOperator = retryWithToleranceOperator; this.time = time; this.statusBackingStore = statusBackingStore; } public void removeMetrics() { // Close quietly here so that we can be sure to close everything even if one attempt fails Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group"); Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW } } ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -113,6 +113,7 @@ private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>(); private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>(); + private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>(); Review comment: Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object from within the `WorkerTask` class, but rather, accepting it as a constructor parameter so that it can be closed during `removeMetrics`. Something like: ```java abstract class WorkerTask implements Runnable { private final ErrorHandlingMetrics errorMetrics; // NEW public WorkerTask(ConnectorTaskId id, TaskStatus.Listener statusListener, TargetState initialState, ClassLoader loader, ConnectMetrics connectMetrics, ErrorHandlingMetrics errorMetrics, // NEW RetryWithToleranceOperator retryWithToleranceOperator, Time time, StatusBackingStore statusBackingStore) { this.id = id; this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener); this.errorMetrics = errorMetrics; // NEW this.statusListener = taskMetricsGroup; this.loader = loader; this.targetState = initialState; this.stopping = false; this.cancelled = false; this.taskMetricsGroup.recordState(this.targetState); this.retryWithToleranceOperator = retryWithToleranceOperator; this.time = time; this.statusBackingStore = statusBackingStore; } public void removeMetrics() { // Close quietly here so that we can be sure to close everything even if one attempt fails Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group"); Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW } } ``` How does that look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org