rhauch commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r426971298



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -270,9 +272,15 @@ Duration adminTimeout() {
     List<MetricsReporter> metricsReporters() {
         List<MetricsReporter> reporters = getConfiguredInstances(
                 CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
MetricsReporter.class);
-        JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
+        JmxReporter jmxReporter = new JmxReporter();
         jmxReporter.configure(this.originals());
         reporters.add(jmxReporter);
+        MetricsContext metricsContext = new 
KafkaMetricsContext("kafka.connect.mirror");

Review comment:
       I don't think the KIP mentioned this `kafka.connect.mirror` metrics 
context. It's probably worthwhile to update the KIP and then notify the vote 
thread of the minor change noticed during implementation.

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
         String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);

Review comment:
       Do we need to modify the `KafkaOffsetBackingStore()` constructor? The 
`ConnectUtils.lookupKafkaClusterId(...)` can be called with the `WorkerConfig` 
(which is the parent class of `DistributedConfig`) passed to it via the 
`configure(...)` method, so couldn't the `configure(...)` method call the 
lookup method?
   
   This may seem minor, but it follows the existing pattern for this class.

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
         String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);
-        Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(distributedConfig);

Review comment:
       Could the `KafkaStatusBackingStore(...)` get the cluster ID from the 
`distributedConfig` passed into the `configure(...)` method, similar to the 
`KafkaOffsetBackingStore`?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> 
workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
                 
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the worker's constructor could 
get the Kafka cluster ID directly from the worker config.

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
         String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);
-        Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(distributedConfig);
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 distributedConfig,
-                configTransformer);
+                configTransformer,
+                kafkaClusterId);

Review comment:
       Could the `KafkaConfigBackingStore(...)` get the cluster ID from the 
`distributedConfig`? One of the reasons why we pass the whole worker config to 
the constructor is so that we don't have to always modify the constructor to 
pass in additional information that can be derived from the worker config.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> 
workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
                 
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the backing store's constructor 
could get the Kafka cluster ID directly from the worker config.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -92,6 +93,7 @@
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final String clusterId;

Review comment:
       The Kafka cluster ID is passed into the constructor, but is this 
supposed to represent the Connect cluster ID or the Kafka cluster ID? Since 
this is in Connect code, without a context we'd assume it was the Connect 
cluster ID.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> 
workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
                 
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(config);
 
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 config,
-                configTransformer);
+                configTransformer,
+                kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the backing store's constructor 
could get the Kafka cluster ID directly from the worker config.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
##########
@@ -93,7 +93,7 @@ public static void main(String[] args) {
                 
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
             Worker worker = new Worker(workerId, time, plugins, config, new 
FileOffsetBackingStore(),
-                                       connectorClientConfigOverridePolicy);
+                                       connectorClientConfigOverridePolicy, 
kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the worker's constructor could 
get the Kafka cluster ID directly from the worker config.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> 
workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the backing store's 
`configure(...)` method could get the Kafka cluster ID directly from the worker 
config.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to