jmhostalet created KAFKA-8646:
---------------------------------

             Summary: Materialized.withLoggingDisabled() does not disable the 
changelog topics creation
                 Key: KAFKA-8646
                 URL: https://issues.apache.org/jira/browse/KAFKA-8646
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.3.0
            Reporter: jmhostalet


I have a cluster with 3 brokers running version 0.11

My kafka-streams app was using kafka-client 0.11.0.1 but recently I've migrated 
to 2.3.0

I have no executed any migration as my data is disposable, therefore I have 
deleted all intermediate topics, except input and output topics.

My streams config is:

 
{code:java}
application.id = consumer-id-v1.00
application.server =
bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 524288000
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class 
org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class com.acme.stream.TimeExtractor
default.value.serde = class com.acme.serde.MyDtoSerde
max.task.idle.ms = 0
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 25
partition.grouper = class 
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
{code}
in my stream I am using withLoggingDisabled

 

 
{code:java}
stream.filter((key, val) -> val!=null)
    .selectKey((key, val) -> getId(val))
    .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new 
MyDtoSerde()))
    .windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
                           .grace(windowRetentionPeriodDuration))
    .aggregate(MyDto::new,
               new MyUpdater(),
               Materialized.as("aggregation-updater")
                           .withLoggingDisabled()
                           .with(Serdes.String(), new MyDtoSerde()))
    .toStream((k, v) -> k.key())
    .mapValues(val -> { ...
{code}
but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter if 
I delete them before running again the app or if I change the application.id

With a new application.id, topics are recreated with the new prefix.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to