mimaison commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r876877095


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -576,88 +672,42 @@ public boolean startTask(
 
             executor.submit(workerTask);
             if (workerTask instanceof WorkerSourceTask) {
-                sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) 
workerTask);
+                sourceTaskOffsetCommitter.ifPresent(committer -> 
committer.schedule(id, (WorkerSourceTask) workerTask));
             }
             return true;
         }
     }
 
-    private WorkerTask buildWorkerTask(ClusterConfigState configState,
-                                       ConnectorConfig connConfig,
-                                       ConnectorTaskId id,
-                                       Task task,
-                                       TaskStatus.Listener statusListener,
-                                       TargetState initialState,
-                                       Converter keyConverter,
-                                       Converter valueConverter,
-                                       HeaderConverter headerConverter,
-                                       ClassLoader loader) {
-        ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
-        final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
-            connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
-                connConfig.errorMaxDelayInMillis(), 
connConfig.errorToleranceType(), Time.SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
-        // Decide which type of worker task we need based on the type of task.
-        if (task instanceof SourceTask) {
-            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
-                    connConfig.originalsStrings(), 
config.topicCreationEnable());
-            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
-            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            Map<String, Object> producerProps = producerConfigs(id, 
"connector-producer-" + id, config, sourceConfig, connectorClass,
-                                                                
connectorClientConfigOverridePolicy, kafkaClusterId);
-            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
-            TopicAdmin admin;
-            Map<String, TopicCreationGroup> topicCreationGroups;
-            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
-                Map<String, Object> adminProps = adminConfigs(id, 
"connector-adminclient-" + id, config,
-                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId);
-                admin = new TopicAdmin(adminProps);
-                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
-            } else {
-                admin = null;
-                topicCreationGroups = null;
-            }
-
-            // Note we pass the configState as it performs dynamic 
transformations under the covers
-            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
-                    offsetReader, offsetWriter, config, configState, metrics, 
loader, time, retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
-        } else if (task instanceof SinkTask) {
-            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connConfig.originalsStrings());
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
-            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
-                    keyConverter, valueConverter, headerConverter);
-
-            Map<String, Object> consumerProps = consumerConfigs(id, config, 
connConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
-            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
-
-            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
-                                      valueConverter, headerConverter, 
transformationChain, consumer, loader, time,
-                                      retryWithToleranceOperator, 
workerErrantRecordReporter, herder.statusBackingStore());
-        } else {
-            log.error("Tasks must be a subclass of either SourceTask or 
SinkTask and current is {}", task);
-            throw new ConnectException("Tasks must be a subclass of either 
SourceTask or SinkTask");
-        }
+    static Map<String, Object> 
exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId id,
+                                                              WorkerConfig 
config,
+                                                              ConnectorConfig 
connConfig,
+                                                              Class<? extends 
Connector>  connectorClass,
+                                                              
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                                              String 
clusterId) {
+        Map<String, Object> result = baseProducerConfigs(id.connector(), 
"connector-producer-" + id, config, connConfig, connectorClass, 
connectorClientConfigOverridePolicy, clusterId);
+        ConnectUtils.ensureProperty(
+                result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
+                "for connectors when exactly-once source support is enabled",
+                false
+        );
+        String transactionalId = transactionalId(config.groupId(), 
id.connector(), id.task());

Review Comment:
   Thanks that makes sense. I thought we wanted to use 
`DistributedConfig::transactionalProducerId` as a prefix in 
`Worker::transactionalId` but that's not necessary.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -576,88 +672,42 @@ public boolean startTask(
 
             executor.submit(workerTask);
             if (workerTask instanceof WorkerSourceTask) {
-                sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) 
workerTask);
+                sourceTaskOffsetCommitter.ifPresent(committer -> 
committer.schedule(id, (WorkerSourceTask) workerTask));
             }
             return true;
         }
     }
 
-    private WorkerTask buildWorkerTask(ClusterConfigState configState,
-                                       ConnectorConfig connConfig,
-                                       ConnectorTaskId id,
-                                       Task task,
-                                       TaskStatus.Listener statusListener,
-                                       TargetState initialState,
-                                       Converter keyConverter,
-                                       Converter valueConverter,
-                                       HeaderConverter headerConverter,
-                                       ClassLoader loader) {
-        ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
-        final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
-            connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
-                connConfig.errorMaxDelayInMillis(), 
connConfig.errorToleranceType(), Time.SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
-        // Decide which type of worker task we need based on the type of task.
-        if (task instanceof SourceTask) {
-            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
-                    connConfig.originalsStrings(), 
config.topicCreationEnable());
-            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
-            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            Map<String, Object> producerProps = producerConfigs(id, 
"connector-producer-" + id, config, sourceConfig, connectorClass,
-                                                                
connectorClientConfigOverridePolicy, kafkaClusterId);
-            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
-            TopicAdmin admin;
-            Map<String, TopicCreationGroup> topicCreationGroups;
-            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
-                Map<String, Object> adminProps = adminConfigs(id, 
"connector-adminclient-" + id, config,
-                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId);
-                admin = new TopicAdmin(adminProps);
-                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
-            } else {
-                admin = null;
-                topicCreationGroups = null;
-            }
-
-            // Note we pass the configState as it performs dynamic 
transformations under the covers
-            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
-                    offsetReader, offsetWriter, config, configState, metrics, 
loader, time, retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
-        } else if (task instanceof SinkTask) {
-            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connConfig.originalsStrings());
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
-            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
-                    keyConverter, valueConverter, headerConverter);
-
-            Map<String, Object> consumerProps = consumerConfigs(id, config, 
connConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
-            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
-
-            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
-                                      valueConverter, headerConverter, 
transformationChain, consumer, loader, time,
-                                      retryWithToleranceOperator, 
workerErrantRecordReporter, herder.statusBackingStore());
-        } else {
-            log.error("Tasks must be a subclass of either SourceTask or 
SinkTask and current is {}", task);
-            throw new ConnectException("Tasks must be a subclass of either 
SourceTask or SinkTask");
-        }
+    static Map<String, Object> 
exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId id,
+                                                              WorkerConfig 
config,
+                                                              ConnectorConfig 
connConfig,
+                                                              Class<? extends 
Connector>  connectorClass,
+                                                              
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                                              String 
clusterId) {
+        Map<String, Object> result = baseProducerConfigs(id.connector(), 
"connector-producer-" + id, config, connConfig, connectorClass, 
connectorClientConfigOverridePolicy, clusterId);
+        ConnectUtils.ensureProperty(
+                result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
+                "for connectors when exactly-once source support is enabled",
+                false
+        );
+        String transactionalId = transactionalId(config.groupId(), 
id.connector(), id.task());

Review Comment:
   Thanks, that makes sense. I thought we wanted to use 
`DistributedConfig::transactionalProducerId` as a prefix in 
`Worker::transactionalId` but that's not necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to