jmhostalet created KAFKA-8646: --------------------------------- Summary: Materialized.withLoggingDisabled() does not disable the changelog topics creation Key: KAFKA-8646 URL: https://issues.apache.org/jira/browse/KAFKA-8646 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.0 Reporter: jmhostalet
I have a cluster with 3 brokers running version 0.11 My kafka-streams app was using kafka-client 0.11.0.1 but recently I've migrated to 2.3.0 I have no executed any migration as my data is disposable, therefore I have deleted all intermediate topics, except input and output topics. My streams config is: {code:java} application.id = consumer-id-v1.00 application.server = bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 524288000 client.id = commit.interval.ms = 30000 connections.max.idle.ms = 540000 default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler default.timestamp.extractor = class com.acme.stream.TimeExtractor default.value.serde = class com.acme.serde.MyDtoSerde max.task.idle.ms = 0 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 25 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 1 request.timeout.ms = 40000 retries = 0 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /tmp/kafka-streams topology.optimization = none upgrade.from = null windowstore.changelog.additional.retention.ms = 86400000 {code} in my stream I am using withLoggingDisabled {code:java} stream.filter((key, val) -> val!=null) .selectKey((key, val) -> getId(val)) .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde())) .windowedBy(TimeWindows.of(aggregationWindowSizeDuration) .grace(windowRetentionPeriodDuration)) .aggregate(MyDto::new, new MyUpdater(), Materialized.as("aggregation-updater") .withLoggingDisabled() .with(Serdes.String(), new MyDtoSerde())) .toStream((k, v) -> k.key()) .mapValues(val -> { ... {code} but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter if I delete them before running again the app or if I change the application.id With a new application.id, topics are recreated with the new prefix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)