C0urante commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r885745316
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1000,6 +1090,266 @@ WorkerMetricsGroup workerMetricsGroup() {
return workerMetricsGroup;
}
+ abstract class TaskBuilder {
+
+ private final ConnectorTaskId id;
+ private final ClusterConfigState configState;
+ private final TaskStatus.Listener statusListener;
+ private final TargetState initialState;
+
+ private Task task = null;
+ private ConnectorConfig connectorConfig = null;
+ private Converter keyConverter = null;
+ private Converter valueConverter = null;
+ private HeaderConverter headerConverter = null;
+ private ClassLoader classLoader = null;
+
+ public TaskBuilder(ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState) {
+ this.id = id;
+ this.configState = configState;
+ this.statusListener = statusListener;
+ this.initialState = initialState;
+ }
+
+ public TaskBuilder withTask(Task task) {
+ this.task = task;
+ return this;
+ }
+
+ public TaskBuilder withConnectorConfig(ConnectorConfig
connectorConfig) {
+ this.connectorConfig = connectorConfig;
+ return this;
+ }
+
+ public TaskBuilder withKeyConverter(Converter keyConverter) {
+ this.keyConverter = keyConverter;
+ return this;
+ }
+
+ public TaskBuilder withValueConverter(Converter valueConverter) {
+ this.valueConverter = valueConverter;
+ return this;
+ }
+
+ public TaskBuilder withHeaderConverter(HeaderConverter
headerConverter) {
+ this.headerConverter = headerConverter;
+ return this;
+ }
+
+ public TaskBuilder withClassloader(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ return this;
+ }
+
+ public WorkerTask build() {
+ Objects.requireNonNull(task, "Task cannot be null");
+ Objects.requireNonNull(connectorConfig, "Connector config used by
task cannot be null");
+ Objects.requireNonNull(keyConverter, "Key converter used by task
cannot be null");
+ Objects.requireNonNull(valueConverter, "Value converter used by
task cannot be null");
+ Objects.requireNonNull(headerConverter, "Header converter used by
task cannot be null");
+ Objects.requireNonNull(classLoader, "Classloader used by task
cannot be null");
+
+ ErrorHandlingMetrics errorHandlingMetrics =
errorHandlingMetrics(id);
+ final Class<? extends Connector> connectorClass =
plugins.connectorClass(
+
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+ RetryWithToleranceOperator retryWithToleranceOperator = new
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+ connectorConfig.errorMaxDelayInMillis(),
connectorConfig.errorToleranceType(), Time.SYSTEM);
+ retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+ return doBuild(task, id, configState, statusListener, initialState,
+ connectorConfig, keyConverter, valueConverter,
headerConverter, classLoader,
+ errorHandlingMetrics, connectorClass,
retryWithToleranceOperator);
+ }
+
+ abstract WorkerTask doBuild(Task task,
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ ConnectorConfig connectorConfig,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ ClassLoader classLoader,
+ ErrorHandlingMetrics errorHandlingMetrics,
+ Class<? extends Connector> connectorClass,
+ RetryWithToleranceOperator
retryWithToleranceOperator);
+
+ }
+
+ class SinkTaskBuilder extends TaskBuilder {
+ public SinkTaskBuilder(ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState) {
+ super(id, configState, statusListener, initialState);
+ }
+
+ @Override
+ public WorkerTask doBuild(Task task,
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ ConnectorConfig connectorConfig,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ ClassLoader classLoader,
+ ErrorHandlingMetrics errorHandlingMetrics,
+ Class<? extends Connector> connectorClass,
+ RetryWithToleranceOperator
retryWithToleranceOperator) {
+
+ TransformationChain<SinkRecord> transformationChain = new
TransformationChain<>(connectorConfig.<SinkRecord>transformations(),
retryWithToleranceOperator);
+ log.info("Initializing: {}", transformationChain);
+ SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins,
connectorConfig.originalsStrings());
+ retryWithToleranceOperator.reporters(sinkTaskReporters(id,
sinkConfig, errorHandlingMetrics, connectorClass));
+ WorkerErrantRecordReporter workerErrantRecordReporter =
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+ keyConverter, valueConverter, headerConverter);
+
+ Map<String, Object> consumerProps = baseConsumerConfigs(
+ id.connector(), "connector-consumer-" + id, config,
connectorConfig, connectorClass,
+ connectorClientConfigOverridePolicy, kafkaClusterId,
ConnectorType.SINK);
+ KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerProps);
+
+ return new WorkerSinkTask(id, (SinkTask) task, statusListener,
initialState, config, configState, metrics, keyConverter,
+ valueConverter, headerConverter, transformationChain,
consumer, classLoader, time,
+ retryWithToleranceOperator, workerErrantRecordReporter,
herder.statusBackingStore());
+ }
+ }
+
+ class SourceTaskBuilder extends TaskBuilder {
+ public SourceTaskBuilder(ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState) {
+ super(id, configState, statusListener, initialState);
+ }
+
+ @Override
+ public WorkerTask doBuild(Task task,
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ ConnectorConfig connectorConfig,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ ClassLoader classLoader,
+ ErrorHandlingMetrics errorHandlingMetrics,
+ Class<? extends Connector> connectorClass,
+ RetryWithToleranceOperator
retryWithToleranceOperator) {
+
+ SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins,
+ connectorConfig.originalsStrings(),
config.topicCreationEnable());
+ retryWithToleranceOperator.reporters(sourceTaskReporters(id,
sourceConfig, errorHandlingMetrics));
+ TransformationChain<SourceRecord> transformationChain = new
TransformationChain<>(sourceConfig.<SourceRecord>transformations(),
retryWithToleranceOperator);
+ log.info("Initializing: {}", transformationChain);
+
+ Map<String, Object> producerProps =
baseProducerConfigs(id.connector(), "connector-producer-" + id, config,
sourceConfig, connectorClass,
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+ KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps);
+
+ TopicAdmin topicAdmin;
+ Map<String, TopicCreationGroup> topicCreationGroups;
+ if (config.topicCreationEnable() &&
sourceConfig.usesTopicCreation()) {
+ topicCreationGroups =
TopicCreationGroup.configuredGroups(sourceConfig);
+ // Create a topic admin that the task can use for topic
creation
+ Map<String, Object> adminOverrides =
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+ sourceConfig, connectorClass,
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+ topicAdmin = new TopicAdmin(adminOverrides);
+ } else {
+ topicAdmin = null;
+ topicCreationGroups = null;
+ }
+
+ // Set up the offset backing store for this task instance
+ ConnectorOffsetBackingStore offsetStore = new
ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");
Review Comment:
Ack, done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]