ashmeet13 commented on code in PR #12988:
URL: https://github.com/apache/kafka/pull/12988#discussion_r1720933373


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1516,71 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object value, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config: %s 
found. User setting (%s) will be ignored and the Streams default setting (%s) 
will be used";
+        
+        if (props.containsKey(key) && (!Objects.equals(props.get(key), 
value))) {
+            log.warn(String.format(overwritePropertyLogMessage, config, key, 
props.get(key), value));
+        }
+
+        props.put(key, value);
+    }
+
     private Map<String, Object> getCommonConsumerConfigs() {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-
-        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        // Create a consumer config map with custom default values set by 
Kafka Streams
+        final Map<String, Object> consumerProps = new 
HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);
         if (StreamsConfigUtils.processingMode(this) == 
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
             
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
         }
+        
         consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        overwritePropertyMap(
+            consumerProps,
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
+            originals().get(BOOTSTRAP_SERVERS_CONFIG),
+            "consumer"
+        );
 
         return consumerProps;
     }
 
-    private void checkIfUnexpectedUserSpecifiedConsumerConfig(final 
Map<String, Object> clientProvidedProps,
-                                                              final String[] 
nonConfigurableConfigs) {
-        // Streams does not allow users to configure certain consumer/producer 
configurations, for example,
-        // enable.auto.commit. In cases where user tries to override such 
non-configurable
-        // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
-        // Thus the default values for these consumer/producer configurations 
that are suitable for
-        // Streams will be used instead.
-
-        final String nonConfigurableConfigMessage = "Unexpected user-specified 
%s config: %s found. %sUser setting (%s) will be ignored and the Streams 
default setting (%s) will be used ";
-        final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " 
+ getString(PROCESSING_GUARANTEE_CONFIG) + ". Hence, ";
-
-        for (final String config: nonConfigurableConfigs) {
-            if (clientProvidedProps.containsKey(config)) {
-
-                if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
-                    if 
(!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config)))
 {
-                        log.warn(String.format(nonConfigurableConfigMessage, 
"consumer", config, "", clientProvidedProps.get(config),  
CONSUMER_DEFAULT_OVERRIDES.get(config)));
-                        clientProvidedProps.remove(config);
-                    }
-                } else if (eosEnabled) {
-                    if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "consumer", config, eosMessage, 
clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "producer", config, eosMessage, 
clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if 
(ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) {
-                        log.warn(String.format(nonConfigurableConfigMessage,
-                            "producer", config, eosMessage, 
clientProvidedProps.get(config), "<appId>-<generatedSuffix>"));
-                        clientProvidedProps.remove(config);
-                    }
-                }
+    private void validateConsumerPropertyMap(final Map<String, Object> props) {
+        if (eosEnabled) {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
+            }
+            
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+
+        } else {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS and override values 
if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
             }
         }
+    }
 
+    private void validateProducerPropertyMap(final Map<String, Object> props) {
         if (eosEnabled) {
-            
verifyMaxInFlightRequestPerConnection(clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"producer");
+            }
+            
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+
+        }
+
+        if (props.containsKey(ProducerConfig.PARTITIONER_CLASS_CONFIG)) {
+            final Class<?> c = (Class<?>) 
props.get(ProducerConfig.PARTITIONER_CLASS_CONFIG);
+            if (!StreamPartitioner.class.isAssignableFrom(c)) {

Review Comment:
   This was an attempt to validate the `PARTITIONER_CLASS_CONFIG`.
   But reading the above conversation I believe the decision is to make no 
change for this config in this PR.
   
   Have removed this change and resolving this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to