C0urante commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r885745316


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1000,6 +1090,266 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task = null;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = baseConsumerConfigs(
+                    id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
baseProducerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            TopicAdmin topicAdmin;
+            Map<String, TopicCreationGroup> topicCreationGroups;
+            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+                // Create a topic admin that the task can use for topic 
creation
+                Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                topicAdmin = new TopicAdmin(adminOverrides);
+            } else {
+                topicAdmin = null;
+                topicCreationGroups = null;
+            }
+
+            // Set up the offset backing store for this task instance
+            ConnectorOffsetBackingStore offsetStore = new 
ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");

Review Comment:
   Ack, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to