mimaison commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r874678140


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -271,8 +278,10 @@ void doShutdown() {
             state = State.FAILED;
             statusListener.onFailure(connName, t);
         } finally {
-            ctx.close();
-            metrics.close();
+            Utils.closeQuietly(ctx, "Connector context for " + connName);
+            Utils.closeQuietly(metrics, "Connector metrics for " + connName);
+            Utils.closeQuietly(offsetStorageReader, "Offset reader for " + 
connName);
+            Utils.closeQuietly(offsetStore::stop, "offset backing store for " 
+ connName);

Review Comment:
   Nit: All other messages are capitalized



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##########
@@ -229,4 +229,4 @@ private void expectStore(Map<String, Object> key, byte[] 
keySerialized,
         });
     }
 
-}
+}

Review Comment:
   Can we undo this?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -576,88 +672,42 @@ public boolean startTask(
 
             executor.submit(workerTask);
             if (workerTask instanceof WorkerSourceTask) {
-                sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) 
workerTask);
+                sourceTaskOffsetCommitter.ifPresent(committer -> 
committer.schedule(id, (WorkerSourceTask) workerTask));
             }
             return true;
         }
     }
 
-    private WorkerTask buildWorkerTask(ClusterConfigState configState,
-                                       ConnectorConfig connConfig,
-                                       ConnectorTaskId id,
-                                       Task task,
-                                       TaskStatus.Listener statusListener,
-                                       TargetState initialState,
-                                       Converter keyConverter,
-                                       Converter valueConverter,
-                                       HeaderConverter headerConverter,
-                                       ClassLoader loader) {
-        ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
-        final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
-            connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
-                connConfig.errorMaxDelayInMillis(), 
connConfig.errorToleranceType(), Time.SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
-        // Decide which type of worker task we need based on the type of task.
-        if (task instanceof SourceTask) {
-            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
-                    connConfig.originalsStrings(), 
config.topicCreationEnable());
-            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
-            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            Map<String, Object> producerProps = producerConfigs(id, 
"connector-producer-" + id, config, sourceConfig, connectorClass,
-                                                                
connectorClientConfigOverridePolicy, kafkaClusterId);
-            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
-            TopicAdmin admin;
-            Map<String, TopicCreationGroup> topicCreationGroups;
-            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
-                Map<String, Object> adminProps = adminConfigs(id, 
"connector-adminclient-" + id, config,
-                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId);
-                admin = new TopicAdmin(adminProps);
-                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
-            } else {
-                admin = null;
-                topicCreationGroups = null;
-            }
-
-            // Note we pass the configState as it performs dynamic 
transformations under the covers
-            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
-                    offsetReader, offsetWriter, config, configState, metrics, 
loader, time, retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
-        } else if (task instanceof SinkTask) {
-            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connConfig.originalsStrings());
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
-            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
-                    keyConverter, valueConverter, headerConverter);
-
-            Map<String, Object> consumerProps = consumerConfigs(id, config, 
connConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
-            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
-
-            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
-                                      valueConverter, headerConverter, 
transformationChain, consumer, loader, time,
-                                      retryWithToleranceOperator, 
workerErrantRecordReporter, herder.statusBackingStore());
-        } else {
-            log.error("Tasks must be a subclass of either SourceTask or 
SinkTask and current is {}", task);
-            throw new ConnectException("Tasks must be a subclass of either 
SourceTask or SinkTask");
-        }
+    static Map<String, Object> 
exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId id,
+                                                              WorkerConfig 
config,
+                                                              ConnectorConfig 
connConfig,
+                                                              Class<? extends 
Connector>  connectorClass,
+                                                              
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                                              String 
clusterId) {
+        Map<String, Object> result = baseProducerConfigs(id.connector(), 
"connector-producer-" + id, config, connConfig, connectorClass, 
connectorClientConfigOverridePolicy, clusterId);
+        ConnectUtils.ensureProperty(
+                result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
+                "for connectors when exactly-once source support is enabled",
+                false
+        );
+        String transactionalId = transactionalId(config.groupId(), 
id.connector(), id.task());

Review Comment:
   Should we use `DistributedConfig.transactionalProducerId()` here?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1000,6 +1090,266 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task = null;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = baseConsumerConfigs(
+                    id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
baseProducerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            TopicAdmin topicAdmin;
+            Map<String, TopicCreationGroup> topicCreationGroups;
+            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+                // Create a topic admin that the task can use for topic 
creation
+                Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                topicAdmin = new TopicAdmin(adminOverrides);
+            } else {
+                topicAdmin = null;
+                topicCreationGroups = null;
+            }
+
+            // Set up the offset backing store for this task instance
+            ConnectorOffsetBackingStore offsetStore = new 
ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");

Review Comment:
   Are you planning to address this `TODO` in one of the other PRs? Same below 
in `ExactlyOnceSourceTaskBuilder`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java:
##########
@@ -597,101 +323,4 @@ public String toString() {
                 '}';
     }
 
-    protected void recordPollReturned(int numRecordsInBatch, long duration) {
-        sourceTaskMetricsGroup.recordPoll(numRecordsInBatch, duration);
-    }
-
-    SourceTaskMetricsGroup sourceTaskMetricsGroup() {
-        return sourceTaskMetricsGroup;
-    }
-
-    static class SourceRecordWriteCounter {
-        private final SourceTaskMetricsGroup metricsGroup;
-        private final int batchSize;
-        private boolean completed = false;
-        private int counter;
-        public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
metricsGroup) {
-            assert batchSize > 0;
-            assert metricsGroup != null;
-            this.batchSize = batchSize;
-            counter = batchSize;
-            this.metricsGroup = metricsGroup;
-        }
-        public void skipRecord() {
-            if (counter > 0 && --counter == 0) {
-                finishedAllWrites();
-            }
-        }
-        public void completeRecord() {
-            if (counter > 0 && --counter == 0) {
-                finishedAllWrites();
-            }
-        }
-        public void retryRemaining() {
-            finishedAllWrites();
-        }
-        private void finishedAllWrites() {
-            if (!completed) {
-                metricsGroup.recordWrite(batchSize - counter);
-                completed = true;
-            }
-        }
-    }
-
-    static class SourceTaskMetricsGroup {
-        private final MetricGroup metricGroup;
-        private final Sensor sourceRecordPoll;
-        private final Sensor sourceRecordWrite;
-        private final Sensor sourceRecordActiveCount;
-        private final Sensor pollTime;
-        private int activeRecordCount;
-
-        public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics 
connectMetrics) {
-            ConnectMetricsRegistry registry = connectMetrics.registry();
-            metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
-                    registry.connectorTagName(), id.connector(),
-                    registry.taskTagName(), Integer.toString(id.task()));
-            // remove any previously created metrics in this group to prevent 
collisions.
-            metricGroup.close();
-
-            sourceRecordPoll = metricGroup.sensor("source-record-poll");
-            
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new 
Rate());
-            
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal), 
new CumulativeSum());
-
-            sourceRecordWrite = metricGroup.sensor("source-record-write");
-            
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteRate), 
new Rate());
-            
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal), 
new CumulativeSum());
-
-            pollTime = metricGroup.sensor("poll-batch-time");
-            
pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new 
Max());
-            
pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new 
Avg());
-
-            sourceRecordActiveCount = 
metricGroup.sensor("source-record-active-count");
-            
sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCount),
 new Value());
-            
sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountMax),
 new Max());
-            
sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountAvg),
 new Avg());
-        }
-
-        void close() {
-            metricGroup.close();
-        }
-
-        void recordPoll(int batchSize, long duration) {
-            sourceRecordPoll.record(batchSize);
-            pollTime.record(duration);
-            activeRecordCount += batchSize;
-            sourceRecordActiveCount.record(activeRecordCount);
-        }
-
-        void recordWrite(int recordCount) {
-            sourceRecordWrite.record(recordCount);
-            activeRecordCount -= recordCount;
-            activeRecordCount = Math.max(0, activeRecordCount);
-            sourceRecordActiveCount.record(activeRecordCount);
-        }
-
-        protected MetricGroup metricGroup() {
-            return metricGroup;
-        }
-    }
-}
+}

Review Comment:
   Nit: Let's keep the new line



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1000,6 +1090,266 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task = null;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = baseConsumerConfigs(
+                    id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
baseProducerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            TopicAdmin topicAdmin;
+            Map<String, TopicCreationGroup> topicCreationGroups;
+            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+                // Create a topic admin that the task can use for topic 
creation
+                Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                topicAdmin = new TopicAdmin(adminOverrides);
+            } else {
+                topicAdmin = null;
+                topicCreationGroups = null;
+            }
+
+            // Set up the offset backing store for this task instance
+            ConnectorOffsetBackingStore offsetStore = new 
ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");
+            offsetStore.configure(config);
+
+            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+
+            // Note we pass the configState as it performs dynamic 
transformations under the covers
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
+                    headerConverter, transformationChain, producer, 
topicAdmin, topicCreationGroups,
+                    offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
+        }
+    }
+
+    class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
+        private final Runnable preProducerCheck;
+        private final Runnable postProducerCheck;
+
+        public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id,
+                                            ClusterConfigState configState,
+                                            TaskStatus.Listener statusListener,
+                                            TargetState initialState,
+                                            Runnable preProducerCheck,
+                                            Runnable postProducerCheck) {
+            super(id, configState, statusListener, initialState);
+            this.preProducerCheck = preProducerCheck;
+            this.postProducerCheck = postProducerCheck;
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                                  ConnectorTaskId id,
+                                  ClusterConfigState configState,
+                                  TaskStatus.Listener statusListener,
+                                  TargetState initialState,
+                                  ConnectorConfig connectorConfig,
+                                  Converter keyConverter,
+                                  Converter valueConverter,
+                                  HeaderConverter headerConverter,
+                                  ClassLoader classLoader,
+                                  ErrorHandlingMetrics errorHandlingMetrics,
+                                  Class<? extends Connector> connectorClass,
+                                  RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            // Create a topic admin that the task will use for its offsets 
topic and, potentially, automatic topic creation
+            Map<String, Object> adminOverrides = adminConfigs(id.connector(), 
"connector-adminclient-" + id, config,
+                    sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+            TopicAdmin topicAdmin = new TopicAdmin(adminOverrides);
+
+            Map<String, TopicCreationGroup> topicCreationGroups;
+            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+            } else {
+                topicCreationGroups = null;
+            }
+
+            Map<String, Object> producerProps = 
exactlyOnceSourceTaskProducerConfigs(

Review Comment:
   Nit: can we move this block above the admin block above? so it follows the 
same flow than `SourceTaskBuilder.doBuild()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to