mimaison commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r876877095
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -576,88 +672,42 @@ public boolean startTask(
executor.submit(workerTask);
if (workerTask instanceof WorkerSourceTask) {
- sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask)
workerTask);
+ sourceTaskOffsetCommitter.ifPresent(committer ->
committer.schedule(id, (WorkerSourceTask) workerTask));
}
return true;
}
}
- private WorkerTask buildWorkerTask(ClusterConfigState configState,
- ConnectorConfig connConfig,
- ConnectorTaskId id,
- Task task,
- TaskStatus.Listener statusListener,
- TargetState initialState,
- Converter keyConverter,
- Converter valueConverter,
- HeaderConverter headerConverter,
- ClassLoader loader) {
- ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
- final Class<? extends Connector> connectorClass =
plugins.connectorClass(
- connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
- RetryWithToleranceOperator retryWithToleranceOperator = new
RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
- connConfig.errorMaxDelayInMillis(),
connConfig.errorToleranceType(), Time.SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
- // Decide which type of worker task we need based on the type of task.
- if (task instanceof SourceTask) {
- SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins,
- connConfig.originalsStrings(),
config.topicCreationEnable());
- retryWithToleranceOperator.reporters(sourceTaskReporters(id,
sourceConfig, errorHandlingMetrics));
- TransformationChain<SourceRecord> transformationChain = new
TransformationChain<>(sourceConfig.<SourceRecord>transformations(),
retryWithToleranceOperator);
- log.info("Initializing: {}", transformationChain);
- CloseableOffsetStorageReader offsetReader = new
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
- internalKeyConverter, internalValueConverter);
- OffsetStorageWriter offsetWriter = new
OffsetStorageWriter(offsetBackingStore, id.connector(),
- internalKeyConverter, internalValueConverter);
- Map<String, Object> producerProps = producerConfigs(id,
"connector-producer-" + id, config, sourceConfig, connectorClass,
-
connectorClientConfigOverridePolicy, kafkaClusterId);
- KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps);
- TopicAdmin admin;
- Map<String, TopicCreationGroup> topicCreationGroups;
- if (config.topicCreationEnable() &&
sourceConfig.usesTopicCreation()) {
- Map<String, Object> adminProps = adminConfigs(id,
"connector-adminclient-" + id, config,
- sourceConfig, connectorClass,
connectorClientConfigOverridePolicy, kafkaClusterId);
- admin = new TopicAdmin(adminProps);
- topicCreationGroups =
TopicCreationGroup.configuredGroups(sourceConfig);
- } else {
- admin = null;
- topicCreationGroups = null;
- }
-
- // Note we pass the configState as it performs dynamic
transformations under the covers
- return new WorkerSourceTask(id, (SourceTask) task, statusListener,
initialState, keyConverter, valueConverter,
- headerConverter, transformationChain, producer, admin,
topicCreationGroups,
- offsetReader, offsetWriter, config, configState, metrics,
loader, time, retryWithToleranceOperator, herder.statusBackingStore(),
executor);
- } else if (task instanceof SinkTask) {
- TransformationChain<SinkRecord> transformationChain = new
TransformationChain<>(connConfig.<SinkRecord>transformations(),
retryWithToleranceOperator);
- log.info("Initializing: {}", transformationChain);
- SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins,
connConfig.originalsStrings());
- retryWithToleranceOperator.reporters(sinkTaskReporters(id,
sinkConfig, errorHandlingMetrics, connectorClass));
- WorkerErrantRecordReporter workerErrantRecordReporter =
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
- keyConverter, valueConverter, headerConverter);
-
- Map<String, Object> consumerProps = consumerConfigs(id, config,
connConfig, connectorClass, connectorClientConfigOverridePolicy,
kafkaClusterId);
- KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerProps);
-
- return new WorkerSinkTask(id, (SinkTask) task, statusListener,
initialState, config, configState, metrics, keyConverter,
- valueConverter, headerConverter,
transformationChain, consumer, loader, time,
- retryWithToleranceOperator,
workerErrantRecordReporter, herder.statusBackingStore());
- } else {
- log.error("Tasks must be a subclass of either SourceTask or
SinkTask and current is {}", task);
- throw new ConnectException("Tasks must be a subclass of either
SourceTask or SinkTask");
- }
+ static Map<String, Object>
exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId id,
+ WorkerConfig
config,
+ ConnectorConfig
connConfig,
+ Class<? extends
Connector> connectorClass,
+
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ String
clusterId) {
+ Map<String, Object> result = baseProducerConfigs(id.connector(),
"connector-producer-" + id, config, connConfig, connectorClass,
connectorClientConfigOverridePolicy, clusterId);
+ ConnectUtils.ensureProperty(
+ result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
+ "for connectors when exactly-once source support is enabled",
+ false
+ );
+ String transactionalId = transactionalId(config.groupId(),
id.connector(), id.task());
Review Comment:
Thanks that makes sense. I thought we wanted to use
`DistributedConfig::transactionalProducerId` as a prefix in
`Worker::transactionalId` but that's not necessary.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -576,88 +672,42 @@ public boolean startTask(
executor.submit(workerTask);
if (workerTask instanceof WorkerSourceTask) {
- sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask)
workerTask);
+ sourceTaskOffsetCommitter.ifPresent(committer ->
committer.schedule(id, (WorkerSourceTask) workerTask));
}
return true;
}
}
- private WorkerTask buildWorkerTask(ClusterConfigState configState,
- ConnectorConfig connConfig,
- ConnectorTaskId id,
- Task task,
- TaskStatus.Listener statusListener,
- TargetState initialState,
- Converter keyConverter,
- Converter valueConverter,
- HeaderConverter headerConverter,
- ClassLoader loader) {
- ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
- final Class<? extends Connector> connectorClass =
plugins.connectorClass(
- connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
- RetryWithToleranceOperator retryWithToleranceOperator = new
RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
- connConfig.errorMaxDelayInMillis(),
connConfig.errorToleranceType(), Time.SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
- // Decide which type of worker task we need based on the type of task.
- if (task instanceof SourceTask) {
- SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins,
- connConfig.originalsStrings(),
config.topicCreationEnable());
- retryWithToleranceOperator.reporters(sourceTaskReporters(id,
sourceConfig, errorHandlingMetrics));
- TransformationChain<SourceRecord> transformationChain = new
TransformationChain<>(sourceConfig.<SourceRecord>transformations(),
retryWithToleranceOperator);
- log.info("Initializing: {}", transformationChain);
- CloseableOffsetStorageReader offsetReader = new
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
- internalKeyConverter, internalValueConverter);
- OffsetStorageWriter offsetWriter = new
OffsetStorageWriter(offsetBackingStore, id.connector(),
- internalKeyConverter, internalValueConverter);
- Map<String, Object> producerProps = producerConfigs(id,
"connector-producer-" + id, config, sourceConfig, connectorClass,
-
connectorClientConfigOverridePolicy, kafkaClusterId);
- KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps);
- TopicAdmin admin;
- Map<String, TopicCreationGroup> topicCreationGroups;
- if (config.topicCreationEnable() &&
sourceConfig.usesTopicCreation()) {
- Map<String, Object> adminProps = adminConfigs(id,
"connector-adminclient-" + id, config,
- sourceConfig, connectorClass,
connectorClientConfigOverridePolicy, kafkaClusterId);
- admin = new TopicAdmin(adminProps);
- topicCreationGroups =
TopicCreationGroup.configuredGroups(sourceConfig);
- } else {
- admin = null;
- topicCreationGroups = null;
- }
-
- // Note we pass the configState as it performs dynamic
transformations under the covers
- return new WorkerSourceTask(id, (SourceTask) task, statusListener,
initialState, keyConverter, valueConverter,
- headerConverter, transformationChain, producer, admin,
topicCreationGroups,
- offsetReader, offsetWriter, config, configState, metrics,
loader, time, retryWithToleranceOperator, herder.statusBackingStore(),
executor);
- } else if (task instanceof SinkTask) {
- TransformationChain<SinkRecord> transformationChain = new
TransformationChain<>(connConfig.<SinkRecord>transformations(),
retryWithToleranceOperator);
- log.info("Initializing: {}", transformationChain);
- SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins,
connConfig.originalsStrings());
- retryWithToleranceOperator.reporters(sinkTaskReporters(id,
sinkConfig, errorHandlingMetrics, connectorClass));
- WorkerErrantRecordReporter workerErrantRecordReporter =
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
- keyConverter, valueConverter, headerConverter);
-
- Map<String, Object> consumerProps = consumerConfigs(id, config,
connConfig, connectorClass, connectorClientConfigOverridePolicy,
kafkaClusterId);
- KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerProps);
-
- return new WorkerSinkTask(id, (SinkTask) task, statusListener,
initialState, config, configState, metrics, keyConverter,
- valueConverter, headerConverter,
transformationChain, consumer, loader, time,
- retryWithToleranceOperator,
workerErrantRecordReporter, herder.statusBackingStore());
- } else {
- log.error("Tasks must be a subclass of either SourceTask or
SinkTask and current is {}", task);
- throw new ConnectException("Tasks must be a subclass of either
SourceTask or SinkTask");
- }
+ static Map<String, Object>
exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId id,
+ WorkerConfig
config,
+ ConnectorConfig
connConfig,
+ Class<? extends
Connector> connectorClass,
+
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ String
clusterId) {
+ Map<String, Object> result = baseProducerConfigs(id.connector(),
"connector-producer-" + id, config, connConfig, connectorClass,
connectorClientConfigOverridePolicy, clusterId);
+ ConnectUtils.ensureProperty(
+ result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
+ "for connectors when exactly-once source support is enabled",
+ false
+ );
+ String transactionalId = transactionalId(config.groupId(),
id.connector(), id.task());
Review Comment:
Thanks, that makes sense. I thought we wanted to use
`DistributedConfig::transactionalProducerId` as a prefix in
`Worker::transactionalId` but that's not necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]