[
https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jmhostalet updated KAFKA-8646:
--
Description:
I have a cluster with 3 brokers running version 0.11
My kafka-streams app was using kafka-client 0.11.0.1 but recently I've migrated
to 2.3.0
I have no executed any migration as my data is disposable, therefore I have
deleted all intermediate topics, except input and output topics.
My streams config is:
{code:java}
application.id = consumer-id-v1.00
application.server =
bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 524288000
client.id =
commit.interval.ms = 3
connections.max.idle.ms = 54
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class com.acme.stream.TimeExtractor
default.value.serde = class com.acme.serde.MyDtoSerde
max.task.idle.ms = 0
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
num.standby.replicas = 0
num.stream.threads = 25
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 4
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 60
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 8640
{code}
in my stream I am using withLoggingDisabled
{code:java}
stream.filter((key, val) -> val!=null)
.selectKey((key, val) -> getId(val))
.groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new
MyDtoSerde()))
.windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
.grace(windowRetentionPeriodDuration))
.aggregate(MyDto::new,
new MyUpdater(),
Materialized.as("aggregation-updater")
.withLoggingDisabled()
.with(Serdes.String(), new MyDtoSerde()))
.toStream((k, v) -> k.key())
.mapValues(val -> { ...
{code}
but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter if
I delete them before running again the app or if I change the application.id
With a new application.id, topics are recreated with the new prefix.
was:
I have a cluster with 3 brokers running version 0.11
My kafka-streams app was using kafka-client 0.11.0.1 but recently I've migrated
to 2.3.0
I have no executed any migration as my data is disposable, therefore I have
deleted all intermediate topics, except input and output topics.
My streams config is:
{code:java}
application.id = consumer-id-v1.00
application.server =
bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 524288000
client.id =
commit.interval.ms = 3
connections.max.idle.ms = 54
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class com.acme.stream.TimeExtractor
default.value.serde = class com.acme.serde.MyDtoSerde
max.task.idle.ms = 0
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
num.standby.replicas = 0
num.stream.threads = 25
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 4
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 60
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 8640
{code}
in my stream I am using withLoggingDisabled
{code:java}
stream.filter((key, val) -> val!=null)
.selectKey((key, val) -> getId(val))
.groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new
MyDtoSerde()))
.windowedBy(TimeWindows.of(aggregationWindowSizeDuration)