bbejeck commented on code in PR #17170:
URL: https://github.com/apache/kafka/pull/17170#discussion_r1761478023
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1775,11 +1735,7 @@ public Map<String, Object> getProducerConfigs(final
String clientId) {
props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);
- // When using EOS alpha, stream should auto-downgrade the
transactional commit protocol to be compatible with older brokers.
- if (StreamsConfigUtils.processingMode(this) ==
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) {
- props.put("internal.auto.downgrade.txn.commit", true);
- }
-
+ // TODO: given that we remove this code, it seems we can remove the
corresponding tests (cf other TODO)
Review Comment:
My vote is to do the cleanup in this PR and remove
`internal.auto.downgrade.txn.commit` and associated code
##########
streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java:
##########
@@ -42,12 +42,12 @@ public enum ProcessingMode {
this.name = name;
}
}
-
- @SuppressWarnings("deprecation")
+
+ // TODO cleanup
public static ProcessingMode processingMode(final StreamsConfig config) {
- if
(StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
{
+ if
("exactly_once".equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
{
return ProcessingMode.EXACTLY_ONCE_ALPHA;
Review Comment:
maybe take the same approach with `EXACTLY_ONCE_ALPHA` ? Same for the other
occurances
##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -176,13 +175,13 @@ public void createTopics() throws Exception {
}
@ParameterizedTest
- @ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE,
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
+ @ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE,
StreamsConfig.EXACTLY_ONCE_V2})
Review Comment:
We don't have to address in this PR, but why do we have `AT_LEAST_ONCE`
parameters in the EOS integration test?
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -600,24 +562,7 @@ public void
shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosDisa
assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"),
is(nullValue()));
}
- @SuppressWarnings("deprecation")
- @Test
- public void
shouldSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosAlpha() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs(clientId);
- assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"),
is(true));
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void
shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosBeta() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
EXACTLY_ONCE_BETA);
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs(clientId);
- assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"),
is(nullValue()));
- }
-
+ // TODO: should we keep this test? (cf other TODO)
Review Comment:
Yes, let's remove in this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]