[jira] [Updated] (KAFKA-8646) Materialized.withLoggingDisabled() does not disable changelog topics creation

2019-07-10 Thread jmhostalet (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jmhostalet updated KAFKA-8646:
--
Description: 
I have a cluster with 3 brokers running version 0.11

My kafka-streams app was using kafka-client 0.11.0.1 but recently I've migrated 
to 2.3.0

I have no executed any migration as my data is disposable, therefore I have 
deleted all intermediate topics, except input and output topics.

My streams config is: 
{code:java}
application.id = consumer-id-v1.00
application.server =
bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 524288000
client.id =
commit.interval.ms = 3
connections.max.idle.ms = 54
default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class 
org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class com.acme.stream.TimeExtractor
default.value.serde = class com.acme.serde.MyDtoSerde
max.task.idle.ms = 0
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
num.standby.replicas = 0
num.stream.threads = 25
partition.grouper = class 
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 4
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 60
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 8640
{code}
in my stream I am using withLoggingDisabled 
{code:java}
stream.filter((key, val) -> val!=null)
.selectKey((key, val) -> getId(val))
.groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new 
MyDtoSerde()))
.windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
   .grace(windowRetentionPeriodDuration))
.aggregate(MyDto::new,
   new MyUpdater(),
   Materialized.as("aggregation-updater")
   .withLoggingDisabled()
   .with(Serdes.String(), new MyDtoSerde()))
.toStream((k, v) -> k.key())
.mapValues(val -> { ...
{code}
but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter if 
I delete them before running again the app or if I change the application.id

With a new application.id, topics are recreated with the new prefix.

 

  was:
I have a cluster with 3 brokers running version 0.11

My kafka-streams app was using kafka-client 0.11.0.1 but recently I've migrated 
to 2.3.0

I have no executed any migration as my data is disposable, therefore I have 
deleted all intermediate topics, except input and output topics.

My streams config is:

 
{code:java}
application.id = consumer-id-v1.00
application.server =
bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 524288000
client.id =
commit.interval.ms = 3
connections.max.idle.ms = 54
default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class 
org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class com.acme.stream.TimeExtractor
default.value.serde = class com.acme.serde.MyDtoSerde
max.task.idle.ms = 0
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
num.standby.replicas = 0
num.stream.threads = 25
partition.grouper = class 
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 4
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 60
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 8640
{code}
in my stream I am using withLoggingDisabled

 

 
{code:java}
stream.filter((key, val) -> val!=null)
.selectKey((key, val) -> getId(val))
.groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new 
MyDtoSerde()))
.windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
   

[jira] [Updated] (KAFKA-8646) Materialized.withLoggingDisabled() does not disable changelog topics creation

2019-07-10 Thread jmhostalet (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jmhostalet updated KAFKA-8646:
--
Summary: Materialized.withLoggingDisabled() does not disable changelog 
topics creation  (was: Materialized.withLoggingDisabled() does not disable the 
changelog topics creation)

> Materialized.withLoggingDisabled() does not disable changelog topics creation
> -
>
> Key: KAFKA-8646
> URL: https://issues.apache.org/jira/browse/KAFKA-8646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: jmhostalet
>Priority: Minor
>
> I have a cluster with 3 brokers running version 0.11
> My kafka-streams app was using kafka-client 0.11.0.1 but recently I've 
> migrated to 2.3.0
> I have no executed any migration as my data is disposable, therefore I have 
> deleted all intermediate topics, except input and output topics.
> My streams config is:
>  
> {code:java}
> application.id = consumer-id-v1.00
> application.server =
> bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
> buffered.records.per.partition = 1000
> cache.max.bytes.buffering = 524288000
> client.id =
> commit.interval.ms = 3
> connections.max.idle.ms = 54
> default.deserialization.exception.handler = class 
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> default.key.serde = class 
> org.apache.kafka.common.serialization.Serdes$StringSerde
> default.production.exception.handler = class 
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
> default.timestamp.extractor = class com.acme.stream.TimeExtractor
> default.value.serde = class com.acme.serde.MyDtoSerde
> max.task.idle.ms = 0
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> num.standby.replicas = 0
> num.stream.threads = 25
> partition.grouper = class 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = at_least_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 1
> request.timeout.ms = 4
> retries = 0
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 60
> state.dir = /tmp/kafka-streams
> topology.optimization = none
> upgrade.from = null
> windowstore.changelog.additional.retention.ms = 8640
> {code}
> in my stream I am using withLoggingDisabled
>  
>  
> {code:java}
> stream.filter((key, val) -> val!=null)
> .selectKey((key, val) -> getId(val))
> .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new 
> MyDtoSerde()))
> .windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
>.grace(windowRetentionPeriodDuration))
> .aggregate(MyDto::new,
>new MyUpdater(),
>Materialized.as("aggregation-updater")
>.withLoggingDisabled()
>.with(Serdes.String(), new MyDtoSerde()))
> .toStream((k, v) -> k.key())
> .mapValues(val -> { ...
> {code}
> but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter 
> if I delete them before running again the app or if I change the 
> application.id
> With a new application.id, topics are recreated with the new prefix.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)