[jira] [Commented] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases
[ https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766934#comment-16766934 ] Daniele Ascione commented on KAFKA-7794: [~kartikvk1996] I've used Kafka 0.10.2.1 > kafka.tools.GetOffsetShell does not return the offset in some cases > --- > > Key: KAFKA-7794 > URL: https://issues.apache.org/jira/browse/KAFKA-7794 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: Daniele Ascione >Assignee: Kartik >Priority: Critical > Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, > shell-script, shellscript, tools, usability > Attachments: image-2019-02-11-20-51-07-805.png, > image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, > image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, > image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, > image-2019-02-13-11-43-28-873.png, image-2019-02-13-11-44-18-736.png, > image-2019-02-13-11-45-21-459.png > > > For some input for the timestamps (different from -1 or -2) the GetOffset is > not able to retrieve the offset. > For example, if _x_ is the timestamp in that "not working range", and you > execute: > {code:java} > bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --time x > {code} > The output is: > {code:java} > MY_TOPIC:8: > MY_TOPIC:2: > MY_TOPIC:5: > MY_TOPIC:4: > MY_TOPIC:7: > MY_TOPIC:1: > MY_TOPIC:9:{code} > while after the last ":" an integer representing the offset is expected. > > Steps to reproduce it: > # Consume all the messages from the beginning and print the timestamp: > {code:java} > bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true > > messages{code} > # Sort the messages by timestamp and get some of the oldest messages: > {code:java} > awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code} > # Take (for example) the timestamp of the 10th oldest message, and see if > GetOffsetShell is not able to print the offset: > {code:java} > timestamp="$(sed '10q;d' msg_sorted | cut -f1)" > bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp > # The output should be something like: > # MY_TOPIC:1: > # MY_TOPIC:2: > (repeated for every partition){code} > # Verify that the message with that timestamp is still in Kafka: > {code:java} > bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep > "CreateTime:$timestamp" {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases
[ https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766934#comment-16766934 ] Daniele Ascione edited comment on KAFKA-7794 at 2/13/19 8:37 AM: - [~kartikvk1996] I've used Kafka 0.10.2.1 Tried, on client side, with 0.10.2.1 and 0.10.2.2 was (Author: dascione): [~kartikvk1996] I've used Kafka 0.10.2.1 > kafka.tools.GetOffsetShell does not return the offset in some cases > --- > > Key: KAFKA-7794 > URL: https://issues.apache.org/jira/browse/KAFKA-7794 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: Daniele Ascione >Assignee: Kartik >Priority: Critical > Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, > shell-script, shellscript, tools, usability > Attachments: image-2019-02-11-20-51-07-805.png, > image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, > image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, > image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, > image-2019-02-13-11-43-28-873.png, image-2019-02-13-11-44-18-736.png, > image-2019-02-13-11-45-21-459.png > > > For some input for the timestamps (different from -1 or -2) the GetOffset is > not able to retrieve the offset. > For example, if _x_ is the timestamp in that "not working range", and you > execute: > {code:java} > bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --time x > {code} > The output is: > {code:java} > MY_TOPIC:8: > MY_TOPIC:2: > MY_TOPIC:5: > MY_TOPIC:4: > MY_TOPIC:7: > MY_TOPIC:1: > MY_TOPIC:9:{code} > while after the last ":" an integer representing the offset is expected. > > Steps to reproduce it: > # Consume all the messages from the beginning and print the timestamp: > {code:java} > bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true > > messages{code} > # Sort the messages by timestamp and get some of the oldest messages: > {code:java} > awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code} > # Take (for example) the timestamp of the 10th oldest message, and see if > GetOffsetShell is not able to print the offset: > {code:java} > timestamp="$(sed '10q;d' msg_sorted | cut -f1)" > bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp > # The output should be something like: > # MY_TOPIC:1: > # MY_TOPIC:2: > (repeated for every partition){code} > # Verify that the message with that timestamp is still in Kafka: > {code:java} > bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep > "CreateTime:$timestamp" {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7925) Constant 100% cpu usage by all kafka brokers
Abhi created KAFKA-7925: --- Summary: Constant 100% cpu usage by all kafka brokers Key: KAFKA-7925 URL: https://issues.apache.org/jira/browse/KAFKA-7925 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.1.0 Environment: Java 11, Kafka v2.1.0 Reporter: Abhi Attachments: threadump20190212.txt Hi, I am seeing constant 100% cpu usage on all brokers in our kafka cluster even without any clients connected to any broker. This is a bug that we have seen multiple times in our kafka setup that is not yet open to clients. It is becoming a blocker for our deployment now. I am seeing lot of connections to other brokers in CLOSE_WAIT state (see below). In thread usage, I am seeing these threads 'kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-0,kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-1,kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-2' taking up more than 90% of the cpu time in a 60s interval. I have attached a thread dump of one of the brokers in the cluster. *Java version:* openjdk 11.0.2 2019-01-15 OpenJDK Runtime Environment 18.9 (build 11.0.2+9) OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode) *Kafka verison:* v2.1.0 *connections:* java 144319 kafkagod 88u IPv4 3063266 0t0 TCP *:35395 (LISTEN) java 144319 kafkagod 89u IPv4 3063267 0t0 TCP *:9144 (LISTEN) java 144319 kafkagod 104u IPv4 3064219 0t0 TCP mwkafka-prod-02.tbd:47292->mwkafka-zk-prod-05.tbd:2181 (ESTABLISHED) java 144319 kafkagod 2003u IPv4 3055115 0t0 TCP *:9092 (LISTEN) java 144319 kafkagod 2013u IPv4 7220110 0t0 TCP mwkafka-prod-02.tbd:60724->mwkafka-zk-prod-04.dr:2181 (ESTABLISHED) java 144319 kafkagod 2020u IPv4 30012904 0t0 TCP mwkafka-prod-02.tbd:38988->mwkafka-prod-02.nyc:9092 (ESTABLISHED) java 144319 kafkagod 2021u IPv4 30012961 0t0 TCP mwkafka-prod-02.tbd:58420->mwkafka-prod-01.nyc:9092 (ESTABLISHED) java 144319 kafkagod 2027u IPv4 30015723 0t0 TCP mwkafka-prod-02.tbd:58398->mwkafka-prod-01.nyc:9092 (ESTABLISHED) java 144319 kafkagod 2028u IPv4 30015630 0t0 TCP mwkafka-prod-02.tbd:36248->mwkafka-prod-02.dr:9092 (ESTABLISHED) java 144319 kafkagod 2030u IPv4 30015726 0t0 TCP mwkafka-prod-02.tbd:39012->mwkafka-prod-02.nyc:9092 (ESTABLISHED) java 144319 kafkagod 2031u IPv4 30013619 0t0 TCP mwkafka-prod-02.tbd:38986->mwkafka-prod-02.nyc:9092 (ESTABLISHED) java 144319 kafkagod 2032u IPv4 30015604 0t0 TCP mwkafka-prod-02.tbd:36246->mwkafka-prod-02.dr:9092 (ESTABLISHED) java 144319 kafkagod 2033u IPv4 30012981 0t0 TCP mwkafka-prod-02.tbd:36924->mwkafka-prod-01.dr:9092 (ESTABLISHED) java 144319 kafkagod 2034u IPv4 30012967 0t0 TCP mwkafka-prod-02.tbd:39036->mwkafka-prod-02.nyc:9092 (ESTABLISHED) java 144319 kafkagod 2035u IPv4 30012898 0t0 TCP mwkafka-prod-02.tbd:36866->mwkafka-prod-01.dr:9092 (FIN_WAIT2) java 144319 kafkagod 2036u IPv4 30004729 0t0 TCP mwkafka-prod-02.tbd:36882->mwkafka-prod-01.dr:9092 (ESTABLISHED) java 144319 kafkagod 2037u IPv4 30004914 0t0 TCP mwkafka-prod-02.tbd:58426->mwkafka-prod-01.nyc:9092 (ESTABLISHED) java 144319 kafkagod 2038u IPv4 30015651 0t0 TCP mwkafka-prod-02.tbd:36884->mwkafka-prod-01.dr:9092 (ESTABLISHED) java 144319 kafkagod 2039u IPv4 30012966 0t0 TCP mwkafka-prod-02.tbd:58422->mwkafka-prod-01.nyc:9092 (ESTABLISHED) java 144319 kafkagod 2040u IPv4 30005643 0t0 TCP mwkafka-prod-02.tbd:36252->mwkafka-prod-02.dr:9092 (ESTABLISHED) java 144319 kafkagod 2041u IPv4 30012944 0t0 TCP mwkafka-prod-02.tbd:36286->mwkafka-prod-02.dr:9092 (ESTABLISHED) java 144319 kafkagod 2042u IPv4 30012973 0t0 TCP mwkafka-prod-02.tbd:9092->mwkafka-prod-01.nyc:51924 (ESTABLISHED) java 144319 kafkagod 2043u sock 0,7 0t0 30012463 protocol: TCP java 144319 kafkagod 2044u IPv4 30012979 0t0 TCP mwkafka-prod-02.tbd:9092->mwkafka-prod-01.dr:39994 (ESTABLISHED) java 144319 kafkagod 2045u IPv4 30012899 0t0 TCP mwkafka-prod-02.tbd:9092->mwkafka-prod-02.nyc:34548 (ESTABLISHED) java 144319 kafkagod 2046u sock 0,7 0t0 30003437 protocol: TCP java 144319 kafkagod 2047u IPv4 30012980 0t0 TCP mwkafka-prod-02.tbd:9092->mwkafka-prod-02.dr:38120 (ESTABLISHED) java 144319 kafkagod 2048u sock 0,7 0t0 30012546 protocol: TCP java 144319 kafkagod 2049u IPv4 30005418 0t0 TCP mwkafka-prod-02.tbd:9092->mwkafka-prod-01.dr:39686 (CLOSE_WAIT) java 144319 kafkagod 2050u IPv4 30009977 0t0 TCP mwkafka-prod-02.tbd:9092->mwkafka-prod-02.nyc:34552 (ESTABLISHED) java 144319 kafkagod 2060u sock 0,7 0t0 30003439 protocol: TCP java 144319 kafkagod 2061u IPv4 30012906 0t0 TCP mwkafka-prod-02.tbd:9092->mwkafka-prod-01.nyc:51862 (ESTABLISHED) java 144319 kafkagod 2069u IPv4 30005642 0t0 TCP mwkafka-prod-02.tbd:9092->mwkafka-prod-02.nyc:34570 (ESTABLISHED) java 144319 kafkagod 2073u sock 0,7 0t0 30003440 protocol: TCP java 144319 kafkagod 2086u IPv4 30005644 0t0 TCP mwkafka-prod-02.tbd:9092->mwkafka-prod-01.nyc:5187
[jira] [Created] (KAFKA-7926) Issue with Kafka Broker - UnknownServerException
Santosh S created KAFKA-7926: Summary: Issue with Kafka Broker - UnknownServerException Key: KAFKA-7926 URL: https://issues.apache.org/jira/browse/KAFKA-7926 Project: Kafka Issue Type: Bug Components: core Environment: production Reporter: Santosh S Our application uses `springBootVersion = 2.0.4.RELEASE` along with `compile('io.projectreactor.kafka:reactor-kafka:1.0.1.RELEASE')` dependency. The Kafka Broker that we have is at version `1.0.1`. Intermittently when we send the messages onto Kafka by creating `reactor.kafka.sender.SenderRecord` and in response of Kafka when look for `reactor.kafka.sender.SenderResult.exception()` we have `java.lang.RuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request` populated in the exception. Upon retrying couple of times, the messages get through successfully. On the broker logs the below error is being printed multiple times without any stacktrace `[2019-02-08 15:43:07,501] ERROR [ReplicaManager broker=3] Error processing append operation on partition price-promotions-local-event-0 (kafka.server.ReplicaManager) ` where `price-promotions-local-event` is our topic. I have looked online but there is no definitive resolution or ways to triage this issue, many thanks in advance for any help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767006#comment-16767006 ] Mateusz Owczarek commented on KAFKA-7882: - [~mjsax] As I reported, I was using Kafka Streams 2.0.0 with Scala DSL API, where transform method accepts Transformer instance (not TransformerSupplier) as a parameter: {code} def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], stateStoreNames: String*): KStream[K1, V1] = { val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new TransformerSupplier[K, V, KeyValue[K1, V1]] { override def get(): Transformer[K, V, KeyValue[K1, V1]] = { new Transformer[K, V, KeyValue[K1, V1]] { override def transform(key: K, value: V): KeyValue[K1, V1] = { transformer.transform(key, value) match { case (k1, v1) => KeyValue.pair(k1, v1) case _ => null } } override def init(context: ProcessorContext): Unit = transformer.init(context) override def close(): Unit = transformer.close() } } } inner.transform(transformerSupplierJ, stateStoreNames: _*) } {code} I believe the implementation changed now in 2.1.0 and does actually accept TransformerSupplier. > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7927) Read committed receives aborted events
Gabor Somogyi created KAFKA-7927: Summary: Read committed receives aborted events Key: KAFKA-7927 URL: https://issues.apache.org/jira/browse/KAFKA-7927 Project: Kafka Issue Type: Bug Components: consumer, core, producer Affects Versions: 1.0.0 Reporter: Gabor Somogyi When a kafka client produces ~30k events and at the end it aborts the transaction a consumer can read part of the aborted messages when "isolation.level" set to "READ_COMMITTED". Kafka client version: 2.0.0 Kafka broker version: 1.0.0 Producer: {code:java} java -jar kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic {code} See attached code. Consumer: {code:java} kafka-console-consumer --zookeeper localhost:2181 --topic src-topic --from-beginning --isolation-level read_committed {code} Same behavior seen when re-implemented the consumer in scala. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7927) Read committed receives aborted events
[ https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated KAFKA-7927: - Attachment: KafkaProducer.scala > Read committed receives aborted events > -- > > Key: KAFKA-7927 > URL: https://issues.apache.org/jira/browse/KAFKA-7927 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer >Affects Versions: 1.0.0 >Reporter: Gabor Somogyi >Priority: Blocker > Attachments: KafkaProducer.scala, consumer.log, producer.log.gz > > > When a kafka client produces ~30k events and at the end it aborts the > transaction a consumer can read part of the aborted messages when > "isolation.level" set to "READ_COMMITTED". > Kafka client version: 2.0.0 > Kafka broker version: 1.0.0 > Producer: > {code:java} > java -jar > kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar > gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic > {code} > See attached code. > Consumer: > {code:java} > kafka-console-consumer --zookeeper localhost:2181 --topic src-topic > --from-beginning --isolation-level read_committed > {code} > Same behavior seen when re-implemented the consumer in scala. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7927) Read committed receives aborted events
[ https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated KAFKA-7927: - Attachment: producer.log.gz > Read committed receives aborted events > -- > > Key: KAFKA-7927 > URL: https://issues.apache.org/jira/browse/KAFKA-7927 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer >Affects Versions: 1.0.0 >Reporter: Gabor Somogyi >Priority: Blocker > Attachments: KafkaProducer.scala, consumer.log, producer.log.gz > > > When a kafka client produces ~30k events and at the end it aborts the > transaction a consumer can read part of the aborted messages when > "isolation.level" set to "READ_COMMITTED". > Kafka client version: 2.0.0 > Kafka broker version: 1.0.0 > Producer: > {code:java} > java -jar > kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar > gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic > {code} > See attached code. > Consumer: > {code:java} > kafka-console-consumer --zookeeper localhost:2181 --topic src-topic > --from-beginning --isolation-level read_committed > {code} > Same behavior seen when re-implemented the consumer in scala. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7927) Read committed receives aborted events
[ https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated KAFKA-7927: - Attachment: consumer.log > Read committed receives aborted events > -- > > Key: KAFKA-7927 > URL: https://issues.apache.org/jira/browse/KAFKA-7927 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer >Affects Versions: 1.0.0 >Reporter: Gabor Somogyi >Priority: Blocker > Attachments: KafkaProducer.scala, consumer.log, producer.log.gz > > > When a kafka client produces ~30k events and at the end it aborts the > transaction a consumer can read part of the aborted messages when > "isolation.level" set to "READ_COMMITTED". > Kafka client version: 2.0.0 > Kafka broker version: 1.0.0 > Producer: > {code:java} > java -jar > kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar > gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic > {code} > See attached code. > Consumer: > {code:java} > kafka-console-consumer --zookeeper localhost:2181 --topic src-topic > --from-beginning --isolation-level read_committed > {code} > Same behavior seen when re-implemented the consumer in scala. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7927) Read committed receives aborted events
[ https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated KAFKA-7927: - Description: When a kafka client produces ~30k events and at the end it aborts the transaction a consumer can read part of the aborted messages when "isolation.level" set to "READ_COMMITTED". Kafka client version: 2.0.0 Kafka broker version: 1.0.0 Producer: {code:java} java -jar kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic {code} See attached code. Consumer: {code:java} kafka-console-consumer --zookeeper localhost:2181 --topic src-topic --from-beginning --isolation-level read_committed {code} Same behavior seen when re-implemented the consumer in scala. The whole application can be found here: https://github.com/gaborgsomogyi/kafka-semantics-tester was: When a kafka client produces ~30k events and at the end it aborts the transaction a consumer can read part of the aborted messages when "isolation.level" set to "READ_COMMITTED". Kafka client version: 2.0.0 Kafka broker version: 1.0.0 Producer: {code:java} java -jar kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic {code} See attached code. Consumer: {code:java} kafka-console-consumer --zookeeper localhost:2181 --topic src-topic --from-beginning --isolation-level read_committed {code} Same behavior seen when re-implemented the consumer in scala. > Read committed receives aborted events > -- > > Key: KAFKA-7927 > URL: https://issues.apache.org/jira/browse/KAFKA-7927 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer >Affects Versions: 1.0.0 >Reporter: Gabor Somogyi >Priority: Blocker > Attachments: KafkaProducer.scala, consumer.log, producer.log.gz > > > When a kafka client produces ~30k events and at the end it aborts the > transaction a consumer can read part of the aborted messages when > "isolation.level" set to "READ_COMMITTED". > Kafka client version: 2.0.0 > Kafka broker version: 1.0.0 > Producer: > {code:java} > java -jar > kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar > gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic > {code} > See attached code. > Consumer: > {code:java} > kafka-console-consumer --zookeeper localhost:2181 --topic src-topic > --from-beginning --isolation-level read_committed > {code} > Same behavior seen when re-implemented the consumer in scala. > The whole application can be found here: > https://github.com/gaborgsomogyi/kafka-semantics-tester -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7917) Streams store cleanup: collapse layers
[ https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-7917: --- Assignee: John Roesler > Streams store cleanup: collapse layers > -- > > Key: KAFKA-7917 > URL: https://issues.apache.org/jira/browse/KAFKA-7917 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Following on KAFKA-7916, we can consider collapsing the "streams management > layers" into one. > Right now, we have: > * metering (also handles moving from pojo world to bytes world) > * change-logging > * caching > This is good compositional style, but we also have some runtime overhead of > calling through all these layers, as well as some mental overhead of > understanding how many and which layers we are going through. > Also, there are dependencies between the caching and change-logging layers. > I _think_ it would simplify the code if we collapsed these into one layer > with boolean switches to turn on or off the different aspects. (rather than > wrapping the store with the different layers or not depending on the same > boolean conditions) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767386#comment-16767386 ] John Roesler commented on KAFKA-7918: - Should we also consider simplifying the StoreChangeLogger by inlining the generic types (which are again always `Bytes, byte[]`? > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers
[ https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767389#comment-16767389 ] John Roesler commented on KAFKA-7917: - Thanks for the comments, all. The main motivation here is not a performance optimization (although there may be some), but simplifying the code base. I agree it will require some discussion, so I've picked up the ticket, and I'm working on a WIP PR so we can have a concrete discussion about whether it results in a simpler system or not. As I've been working on it, I do think that [~guozhang] is right, it seems to pave the way to needing that "root" store reference in init. > Streams store cleanup: collapse layers > -- > > Key: KAFKA-7917 > URL: https://issues.apache.org/jira/browse/KAFKA-7917 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Following on KAFKA-7916, we can consider collapsing the "streams management > layers" into one. > Right now, we have: > * metering (also handles moving from pojo world to bytes world) > * change-logging > * caching > This is good compositional style, but we also have some runtime overhead of > calling through all these layers, as well as some mental overhead of > understanding how many and which layers we are going through. > Also, there are dependencies between the caching and change-logging layers. > I _think_ it would simplify the code if we collapsed these into one layer > with boolean switches to turn on or off the different aspects. (rather than > wrapping the store with the different layers or not depending on the same > boolean conditions) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7440) Use leader epoch in consumer fetch requests
[ https://issues.apache.org/jira/browse/KAFKA-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767409#comment-16767409 ] ASF GitHub Bot commented on KAFKA-7440: --- hachikuji commented on pull request #6190: KAFKA-7440 Add leader epoch to fetch and list-offset request URL: https://github.com/apache/kafka/pull/6190 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use leader epoch in consumer fetch requests > --- > > Key: KAFKA-7440 > URL: https://issues.apache.org/jira/browse/KAFKA-7440 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: David Arthur >Priority: Major > Labels: kip > > This patch adds support in the consumer to use the leader epoch obtained from > the metadata in fetch requests: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7920) Do not permit zstd use until inter.broker.protocol.version is updated to 2.1
[ https://issues.apache.org/jira/browse/KAFKA-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-7920: --- Priority: Blocker (was: Major) > Do not permit zstd use until inter.broker.protocol.version is updated to 2.1 > > > Key: KAFKA-7920 > URL: https://issues.apache.org/jira/browse/KAFKA-7920 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Jason Gustafson >Assignee: Lee Dongjin >Priority: Blocker > Fix For: 2.2.0 > > > After brokers have been upgraded to 2.1, users can begin using zstd > compression. Regardless of the inter.broker.protocol.version, the broker will > happily accept zstd-compressed data as long as the right produce request > version is used. However, if the inter.broker.protocol.version is set to 2.0 > or below, then followers will not be able to use the minimum required fetch > version, which will result in the following error: > {code} > [2019-02-11 17:42:47,116] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition foo-0 at offset 0 > (kafka.server.ReplicaFetcherThread) > > > org.apache.kafka.common.errors.UnsupportedCompressionTypeException: The > requesting client does not support the compression type of given partition. > {code} > We should make produce request validation consistent. Until the > inter.broker.protocol.version is at 2.1 or later, we should reject produce > requests with UNSUPPORTED_COMPRESSION_TYPE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7920) Do not permit zstd use until inter.broker.protocol.version is updated to 2.1
[ https://issues.apache.org/jira/browse/KAFKA-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-7920: --- Fix Version/s: 2.2.0 > Do not permit zstd use until inter.broker.protocol.version is updated to 2.1 > > > Key: KAFKA-7920 > URL: https://issues.apache.org/jira/browse/KAFKA-7920 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Jason Gustafson >Assignee: Lee Dongjin >Priority: Major > Fix For: 2.2.0 > > > After brokers have been upgraded to 2.1, users can begin using zstd > compression. Regardless of the inter.broker.protocol.version, the broker will > happily accept zstd-compressed data as long as the right produce request > version is used. However, if the inter.broker.protocol.version is set to 2.0 > or below, then followers will not be able to use the minimum required fetch > version, which will result in the following error: > {code} > [2019-02-11 17:42:47,116] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition foo-0 at offset 0 > (kafka.server.ReplicaFetcherThread) > > > org.apache.kafka.common.errors.UnsupportedCompressionTypeException: The > requesting client does not support the compression type of given partition. > {code} > We should make produce request validation consistent. Until the > inter.broker.protocol.version is at 2.1 or later, we should reject produce > requests with UNSUPPORTED_COMPRESSION_TYPE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
[ https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7652: --- Description: I'm creating this issue in response to [~guozhang]'s request on the mailing list: [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E] We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but experience a severe performance degradation. The highest amount of CPU time seems spent in retrieving from the local cache. Here's an example thread profile with 0.11.0.0: [https://i.imgur.com/l5VEsC2.png] When things are running smoothly we're gated by retrieving from the state store with acceptable performance. Here's an example thread profile with 0.10.2.1: [https://i.imgur.com/IHxC2cZ.png] Some investigation reveals that it appears we're performing about 3 orders magnitude more lookups on the NamedCache over a comparable time period. I've attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3. We're using session windows and have the app configured for commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760 I'm happy to share more details if they would be helpful. Also happy to run tests on our data. I also found this issue, which seems like it may be related: https://issues.apache.org/jira/browse/KAFKA-4904 KIP-420: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores] was: I'm creating this issue in response to [~guozhang]'s request on the mailing list: [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E] We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but experience a severe performance degradation. The highest amount of CPU time seems spent in retrieving from the local cache. Here's an example thread profile with 0.11.0.0: [https://i.imgur.com/l5VEsC2.png] When things are running smoothly we're gated by retrieving from the state store with acceptable performance. Here's an example thread profile with 0.10.2.1: [https://i.imgur.com/IHxC2cZ.png] Some investigation reveals that it appears we're performing about 3 orders magnitude more lookups on the NamedCache over a comparable time period. I've attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3. We're using session windows and have the app configured for commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760 I'm happy to share more details if they would be helpful. Also happy to run tests on our data. I also found this issue, which seems like it may be related: https://issues.apache.org/jira/browse/KAFKA-4904 > Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0 > - > > Key: KAFKA-7652 > URL: https://issues.apache.org/jira/browse/KAFKA-7652 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, > 2.0.1 >Reporter: Jonathan Gordon >Assignee: Guozhang Wang >Priority: Major > Labels: kip > Fix For: 2.2.0 > > Attachments: kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt > > > I'm creating this issue in response to [~guozhang]'s request on the mailing > list: > [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E] > We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but > experience a severe performance degradation. The highest amount of CPU time > seems spent in retrieving from the local cache. Here's an example thread > profile with 0.11.0.0: > [https://i.imgur.com/l5VEsC2.png] > When things are running smoothly we're gated by retrieving from the state > store with acceptable performance. Here's an example thread profile with > 0.10.2.1: > [https://i.imgur.com/IHxC2cZ.png] > Some investigation reveals that it appears we're performing about 3 orders > magnitude more lookups on the NamedCache over a comparable time period. I've > attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3. > We're using session windows and have the app configured for > commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760 > I'm happy to share more details if they would be helpful. Also happy to run > tests on our data. > I also found this issue, which seems like it may be related: > https://issues.apache.org/jira/browse/KAFKA-4904 > > KIP-420: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores] > > -- This message was sent by Atlassian JIRA (v7.6.3
[jira] [Updated] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
[ https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7652: --- Labels: kip (was: ) > Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0 > - > > Key: KAFKA-7652 > URL: https://issues.apache.org/jira/browse/KAFKA-7652 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, > 2.0.1 >Reporter: Jonathan Gordon >Assignee: Guozhang Wang >Priority: Major > Labels: kip > Fix For: 2.2.0 > > Attachments: kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt > > > I'm creating this issue in response to [~guozhang]'s request on the mailing > list: > [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E] > We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but > experience a severe performance degradation. The highest amount of CPU time > seems spent in retrieving from the local cache. Here's an example thread > profile with 0.11.0.0: > [https://i.imgur.com/l5VEsC2.png] > When things are running smoothly we're gated by retrieving from the state > store with acceptable performance. Here's an example thread profile with > 0.10.2.1: > [https://i.imgur.com/IHxC2cZ.png] > Some investigation reveals that it appears we're performing about 3 orders > magnitude more lookups on the NamedCache over a comparable time period. I've > attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3. > We're using session windows and have the app configured for > commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760 > I'm happy to share more details if they would be helpful. Also happy to run > tests on our data. > I also found this issue, which seems like it may be related: > https://issues.apache.org/jira/browse/KAFKA-4904 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers
[ https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767443#comment-16767443 ] Matthias J. Sax commented on KAFKA-7917: > but simplifying the code base. Having a layered code base and nest stores seems to be simpler to me instead of thousands of if-else statements in the code... (it's very subjective of course) – Looking forward to your PR. > Streams store cleanup: collapse layers > -- > > Key: KAFKA-7917 > URL: https://issues.apache.org/jira/browse/KAFKA-7917 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Following on KAFKA-7916, we can consider collapsing the "streams management > layers" into one. > Right now, we have: > * metering (also handles moving from pojo world to bytes world) > * change-logging > * caching > This is good compositional style, but we also have some runtime overhead of > calling through all these layers, as well as some mental overhead of > understanding how many and which layers we are going through. > Also, there are dependencies between the caching and change-logging layers. > I _think_ it would simplify the code if we collapsed these into one layer > with boolean switches to turn on or off the different aspects. (rather than > wrapping the store with the different layers or not depending on the same > boolean conditions) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767449#comment-16767449 ] Matthias J. Sax commented on KAFKA-7882: Thanks for confirming. Then you are hitting https://issues.apache.org/jira/browse/KAFKA-7250 (it's fixed in 2.0.1 and 2.1.0) – and the provided transformer is indeed shared what is the root cause of the issue. Seems, we can close this as duplicate? > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7481: --- Priority: Critical (was: Blocker) > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Critical > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7481: --- Fix Version/s: (was: 2.2.0) > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector
[ https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767458#comment-16767458 ] Matthias J. Sax commented on KAFKA-7304: Thanks [~rsivaram] and [~yuyang08]! > memory leakage in org.apache.kafka.common.network.Selector > -- > > Key: KAFKA-7304 > URL: https://issues.apache.org/jira/browse/KAFKA-7304 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0, 1.1.1 >Reporter: Yu Yang >Priority: Major > Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at > 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot > 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, > Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 > AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at > 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot > 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, > Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 > PM.png > > > We are testing secured writing to kafka through ssl. Testing at small scale, > ssl writing to kafka was fine. However, when we enabled ssl writing at a > larger scale (>40k clients write concurrently), the kafka brokers soon hit > OutOfMemory issue with 4G memory setting. We have tried with increasing the > heap size to 10Gb, but encountered the same issue. > We took a few heap dumps , and found that most of the heap memory is > referenced through org.apache.kafka.common.network.Selector objects. There > are two Channel maps field in Selector. It seems that somehow the objects is > not deleted from the map in a timely manner. > One observation is that the memory leak seems relate to kafka partition > leader changes. If there is broker restart etc. in the cluster that caused > partition leadership change, the brokers may hit the OOM issue faster. > {code} > private final Map channels; > private final Map closingChannels; > {code} > Please see the attached images and the following link for sample gc > analysis. > http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0 > the command line for running kafka: > {code} > java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m > -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC > -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 > -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 > -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps > -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M > -Djava.awt.headless=true > -Dlog4j.configuration=file:/etc/kafka/log4j.properties > -Dcom.sun.management.jmxremote > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false > -Dcom.sun.management.jmxremote.port= > -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/* > kafka.Kafka /etc/kafka/server.properties > {code} > We use java 1.8.0_102, and has applied a TLS patch on reducing > X509Factory.certCache map size from 750 to 20. > {code} > java -version > java version "1.8.0_102" > Java(TM) SE Runtime Environment (build 1.8.0_102-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767446#comment-16767446 ] Matthias J. Sax commented on KAFKA-7918: SGTM. > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7565) NPE in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767474#comment-16767474 ] Rajini Sivaram commented on KAFKA-7565: --- [~hachikuji] I am not sure if this is covered by your PR for KAFKA-7831. We have a check for whether the partitions returned in a fetch response are still in the fetch session and we return if that is not the case: https://github.com/apache/kafka/blob/b02b5b63a5d43ab552c6cec1237707b2edd1bb36/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L234 Further down in the code, we assume that the partitions do exist in the session and access the partitions without checking for null: https://github.com/apache/kafka/blob/b02b5b63a5d43ab552c6cec1237707b2edd1bb36/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L243 Presumably fetch session partitions can change in between lines 234 aand 243 since the response could be processed on the heartbeat thread. With the changes from your PR for KAFKA-7831, can we guarantee that the session partitions wont change? > NPE in KafkaConsumer > > > Key: KAFKA-7565 > URL: https://issues.apache.org/jira/browse/KAFKA-7565 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1 >Reporter: Alexey Vakhrenev >Priority: Critical > Fix For: 2.2.0 > > > The stacktrace is > {noformat} > java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > {noformat} > Couldn't find minimal reproducer, but it happens quite often in our system. > We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is > somehow related. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing
[ https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767479#comment-16767479 ] Guozhang Wang commented on KAFKA-6460: -- Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update the description of the ticket as well). The main goal for this test utils is similar to KIP-267, i.e. to provide users during their development cycles a smooth iterations of trial-and-error experience, where they do not necessarily need to bring up a full fledged rocksDB instance, for example. So the scope of this task would be: 1) provide a mock store interface that can be used for both {{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the {{Materialized#as(StoreSupplier>)}} (the DSL layer). For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} inside {{streams/test-utils}} artifact, that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, and {{SessionStoreBuilder}} (for PAPI) as well as a {{StoreSupplier>}} (for DSL) provided by a store name, whose {{build}} and {{get}} calls will return a mock (this mock implementation only need to be the inner-most store, i.e. it can still be wrapped with metered / caching / logging, and it should be inside {{org.apache.kafka.streams.test.internals}} package so that they are not part of the public APIs). 2) The mock store implementations should expect it's {{init}} function to be called with a {{MockProcessorContext}} which includes recording all records forwarded via this context (e.g. changelogs). So the mock store implementation itself only need to keep track of its store function calls. 3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build} / {{get}} calls are supposed to be only called once since they are expected to be used with {{TopologyTestDriver}} which does not create multiple topologies. Then users can get access the mocked store instance via {{TopologyTestDriver#getXXStore}} and then use their public APIs to query the store. 4) Additional to allow users to query the store directly, user's may want to also get how many function calls are triggered -- e.g. maybe the current store returns `2` for key `k`, but we also want to make sure it was because `put(k, 1)` and `put(k, 2)` are called. This can be provided by a public API like {{MockStoreFactory#putEntries(storeName)}} that returns a list of entries that are called via {{put}}. 5) For Streams' own unit tests, we can then refactor them to use this new mock store factory. For example, we can remove the internal {{org.apache.kafka.streams.state.KeyValueStoreTestDriver}} and use the above to refactor any unit tests related to this class -- one logic that is not yet supported in {{TopologyTestDriver}} / {{MockStoreFactory}} is store restoration, i.e. streams library may wan to pipe-in some records to the corresponding changelog first before starting the test driver, which will then be used to bootstrap the (possibly mocked) stores. This is not of interest to users, but streams' own unit testing need to cover. > Add mocks for state stores used in Streams unit testing > --- > > Key: KAFKA-6460 > URL: https://issues.apache.org/jira/browse/KAFKA-6460 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > We'd like to use mocks for different types of state stores: kv, window, > session that can be used to record the number of expected put / get calls > used in the DSL operator unit testing. This involves implementing the two > interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object > created from, say, EasyMock, and the object can then be set up with the > expected calls. > In addition, we should also add a mock record collector which can be returned > from the mock processor context so that with logging enabled store, users can > also validate if the changes have been forwarded to the changelog as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing
[ https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767517#comment-16767517 ] Matthias J. Sax commented on KAFKA-6460: [~guozhang] I think it would also be important, to provide a simple way to configure the used store factory (eg, for DSL users) without the need to rewrite the code that should be tested. Thoughts? > Add mocks for state stores used in Streams unit testing > --- > > Key: KAFKA-6460 > URL: https://issues.apache.org/jira/browse/KAFKA-6460 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > We'd like to use mocks for different types of state stores: kv, window, > session that can be used to record the number of expected put / get calls > used in the DSL operator unit testing. This involves implementing the two > interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object > created from, say, EasyMock, and the object can then be set up with the > expected calls. > In addition, we should also add a mock record collector which can be returned > from the mock processor context so that with logging enabled store, users can > also validate if the changes have been forwarded to the changelog as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7921) Instable KafkaStreamsTest
[ https://issues.apache.org/jira/browse/KAFKA-7921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767548#comment-16767548 ] ASF GitHub Bot commented on KAFKA-7921: --- guozhangwang commented on pull request #6262: KAFKA-7921: log at error level for missing source topic URL: https://github.com/apache/kafka/pull/6262 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Instable KafkaStreamsTest > - > > Key: KAFKA-7921 > URL: https://issues.apache.org/jira/browse/KAFKA-7921 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: John Roesler >Priority: Major > > {{KafkaStreamsTest}} failed multiple times, eg, > {quote}java.lang.AssertionError: Condition not met within timeout 15000. > Streams never started. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325) > at > org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:556){quote} > or > {quote}java.lang.AssertionError: Condition not met within timeout 15000. > Streams never started. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325) > at > org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:255){quote} > > The preserved logs are as follows: > {quote}[2019-02-12 07:02:17,198] INFO Kafka version: 2.3.0-SNAPSHOT > (org.apache.kafka.common.utils.AppInfoParser:109) > [2019-02-12 07:02:17,198] INFO Kafka commitId: 08036fa4b1e5b822 > (org.apache.kafka.common.utils.AppInfoParser:110) > [2019-02-12 07:02:17,199] INFO stream-client [clientId] State transition from > CREATED to REBALANCING (org.apache.kafka.streams.KafkaStreams:263) > [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] > Starting (org.apache.kafka.streams.processor.internals.StreamThread:767) > [2019-02-12 07:02:17,200] INFO stream-client [clientId] State transition from > REBALANCING to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:263) > [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] > Starting (org.apache.kafka.streams.processor.internals.StreamThread:767) > [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] > State transition from CREATED to STARTING > (org.apache.kafka.streams.processor.internals.StreamThread:214) > [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] > State transition from CREATED to STARTING > (org.apache.kafka.streams.processor.internals.StreamThread:214) > [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] > Informed to shut down > (org.apache.kafka.streams.processor.internals.StreamThread:1192) > [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-238] > State transition from STARTING to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread:214) > [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] > Informed to shut down > (org.apache.kafka.streams.processor.internals.StreamThread:1192) > [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] > State transition from STARTING to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread:214) > [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg > (org.apache.kafka.clients.Metadata:365) > [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg > (org.apache.kafka.clients.Metadata:365) > [2019-02-12 07:02:17,205] INFO [Consumer > clientId=clientId-StreamThread-238-consumer, groupId=appId] Discovered group > coordinator localhost:36122 (id: 2147483647 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675) > [2019-02-12 07:02:17,205] INFO [Consumer > clientId=clientId-StreamThread-239-consumer, groupId=appId] Discovered group > coordinator localhost:36122 (id: 2147483647 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675) > [2019-02-12 07:02:17,206] INFO [Consumer > clientId=clientId-StreamThread-238-consumer, groupId=appId] Revoking > previously assigned partitions [] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458) > [2019-02-12 07:02:17,206] INFO [Consumer > clientId=clientId-StreamThread-239-consumer, groupId=appId] Revoking > previously assigned partitions [] > (org.apache.k
[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers
[ https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767574#comment-16767574 ] John Roesler commented on KAFKA-7917: - After messing with the code for a while, I'm not happy with the trade-offs. Some parts combine cleanly, but others are tightly coupled with different components of Streams, and smashing them all together creates more of a mess. I'm going to close this ticket, and we'll just take this one step at a time by seeing how the codebase looks after https://issues.apache.org/jira/browse/KAFKA-7918 > Streams store cleanup: collapse layers > -- > > Key: KAFKA-7917 > URL: https://issues.apache.org/jira/browse/KAFKA-7917 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Following on KAFKA-7916, we can consider collapsing the "streams management > layers" into one. > Right now, we have: > * metering (also handles moving from pojo world to bytes world) > * change-logging > * caching > This is good compositional style, but we also have some runtime overhead of > calling through all these layers, as well as some mental overhead of > understanding how many and which layers we are going through. > Also, there are dependencies between the caching and change-logging layers. > I _think_ it would simplify the code if we collapsed these into one layer > with boolean switches to turn on or off the different aspects. (rather than > wrapping the store with the different layers or not depending on the same > boolean conditions) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (KAFKA-7917) Streams store cleanup: collapse layers
[ https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler closed KAFKA-7917. --- > Streams store cleanup: collapse layers > -- > > Key: KAFKA-7917 > URL: https://issues.apache.org/jira/browse/KAFKA-7917 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Following on KAFKA-7916, we can consider collapsing the "streams management > layers" into one. > Right now, we have: > * metering (also handles moving from pojo world to bytes world) > * change-logging > * caching > This is good compositional style, but we also have some runtime overhead of > calling through all these layers, as well as some mental overhead of > understanding how many and which layers we are going through. > Also, there are dependencies between the caching and change-logging layers. > I _think_ it would simplify the code if we collapsed these into one layer > with boolean switches to turn on or off the different aspects. (rather than > wrapping the store with the different layers or not depending on the same > boolean conditions) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7917) Streams store cleanup: collapse layers
[ https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7917. - Resolution: Won't Fix > Streams store cleanup: collapse layers > -- > > Key: KAFKA-7917 > URL: https://issues.apache.org/jira/browse/KAFKA-7917 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Following on KAFKA-7916, we can consider collapsing the "streams management > layers" into one. > Right now, we have: > * metering (also handles moving from pojo world to bytes world) > * change-logging > * caching > This is good compositional style, but we also have some runtime overhead of > calling through all these layers, as well as some mental overhead of > understanding how many and which layers we are going through. > Also, there are dependencies between the caching and change-logging layers. > I _think_ it would simplify the code if we collapsed these into one layer > with boolean switches to turn on or off the different aspects. (rather than > wrapping the store with the different layers or not depending on the same > boolean conditions) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767580#comment-16767580 ] Guozhang Wang commented on KAFKA-7918: -- +1. [~ableegoldman] Thanks for looking into it. > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7758) When Naming a Repartition Topic with Aggregations Reuse Repartition Graph Node for Multiple Operations
[ https://issues.apache.org/jira/browse/KAFKA-7758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767603#comment-16767603 ] ASF GitHub Bot commented on KAFKA-7758: --- bbejeck commented on pull request #6265: KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics URL: https://github.com/apache/kafka/pull/6265 This PR adds support for re-using a `KGroupedStream` or `KGroupedTable object after executing an aggregation operation with a named repartition topic. `KGroupedStream` example ```java final KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey(as("grouping")); kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one"); kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two"); ``` `KGroupedTable` example ```java final KGroupedTable kGroupedTable = builder.table("topic").groupBy(KeyValue::pair, Grouped.as("grouping")); kGroupedTable.count().toStream().to("output-count"); kGroupedTable.reduce((v, v2) -> v2, (v, v2) -> v2).toStream().to("output-reduce"); ``` This approach will not cause any compatibility issues for two reasons 1. Aggregations requiring repartitioning without naming the repartition topic maintain the same topology structure, which is the default mode today. So by not reusing the repartition graph node, the numbering and structure of the topology remains the same. 2. Aggregations where the repartition topic _*is*_ named, it is not possible at the moment to re-use either the `KGroupedStream` or `KGroupedTable` object as Kafka Streams throws an `InvalidTopologyException` when building the topology. Hence you can't even deploy the application. I've added unit tests for each case and ran our existing suite of streams tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > When Naming a Repartition Topic with Aggregations Reuse Repartition Graph > Node for Multiple Operations > -- > > Key: KAFKA-7758 > URL: https://issues.apache.org/jira/browse/KAFKA-7758 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.2.0 > > > When performing aggregations that require repartitioning and the repartition > topic name is specified, and using the resulting {{KGroupedStream}} for > multiple operations i.e. > > {code:java} > final KGroupedStream kGroupedStream = builder. String>stream("topic").selectKey((k, v) -> > k).groupByKey(Grouped.as("grouping")); > kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count(); > kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count(); > {code} > If optimizations aren't enabled, Streams will attempt to build two > repartition topics of the same name resulting in a failure creating the > topology. > > However, we have enough information to re-use the existing repartition node > via graph nodes used for building the intermediate representation of the > topology. This ticket will make the > behavior of reusing a {{KGroupedStream}} consistent regardless if > optimizations are turned on or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767611#comment-16767611 ] Sophie Blee-Goldman commented on KAFKA-7918: In light of the decision to close KAFKA-7917 should we include the three CachingXXStore layers in this as well? > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767623#comment-16767623 ] Matthias J. Sax commented on KAFKA-7918: In general I think yes. However, it implies a little bigger refactoring. Atm, the caching-layer is responsible to execute the callback on eviction that we effectively use to forward messaged to downstream processor (that's why the caching layer deserializes the bytes, because downstream processors expect objects, not bytes – the callback is defined on types, not bytes). However, it seems, that the current behavior is not a nice separation of concerns, because we put bytes into the caching store, and thus it seems to be reasonable to get bytes back on the caching callback (what we would need to do to refactor the stores and get rid of the generics). This implies, that the metered store would need to take a custom callback that expects object, and wraps it with a byte based callback that is registered on the caching store. In the wrapper, the deserialization must happen now to not break public API (ie, we move deserialization from caching up to metered store for the callbacks). Does this make sense? If you want to tackle this, feel free to do. In any case, it's a lot of refactoring and thus, please so multiple smaller PRs instead of one gigantic PR to simplify reviewing :) Thanks. > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7928) Deprecate WindowStore.put(key, value)
John Roesler created KAFKA-7928: --- Summary: Deprecate WindowStore.put(key, value) Key: KAFKA-7928 URL: https://issues.apache.org/jira/browse/KAFKA-7928 Project: Kafka Issue Type: Improvement Reporter: John Roesler Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)` This method is strange... A window store needs to have a timestamp associated with the key, so if you do a put without a timestamp, it's up to the store to just make one up. Even the javadoc on the method recommends not to use it, due to this confusing behavior. We should just deprecate it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767663#comment-16767663 ] Sophie Blee-Goldman commented on KAFKA-7918: Makes sense, I will consider the caching layer to be outside of the immediate scope for now. Does it make sense to break up the PR into one for each type of store (ie kv, window, session)? The implementation changes will be rather minor but the testing framework requires some overhaul since is hardcoded into all the unit tests... (still relatively minor, but certainly many many lines of code) > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7929) RocksDB Window Store allows segments to expire out from under iterator
Sophie Blee-Goldman created KAFKA-7929: -- Summary: RocksDB Window Store allows segments to expire out from under iterator Key: KAFKA-7929 URL: https://issues.apache.org/jira/browse/KAFKA-7929 Project: Kafka Issue Type: Bug Reporter: Sophie Blee-Goldman While we provide no guarantees about returning a snapshot when fetching from persistent window stores, we should at least not allow old segments to expire while an iterator over them remains open. This can result in unexpected behavior as the number of records returned depends on how quickly the results are read from an iterator, and you might even end up reading records with a gap in the middle. For example, you might fetch records between t1 and t3, then immediately read the first record (t1) and do some processing. If enough time advances by the time you read the second record from the iterator, record t2 may have expired, so the next you would read is t3. Therefore you conclude there were records at t1 and t3 but nothing at t2, which is incorrect. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
Murad M created KAFKA-7930: -- Summary: StreamsResetter makes "changelog" topic naming assumptions Key: KAFKA-7930 URL: https://issues.apache.org/jira/browse/KAFKA-7930 Project: Kafka Issue Type: Improvement Components: streams, tools Affects Versions: 2.1.0 Reporter: Murad M StreamsResetter deletes the topics considered internal. Currently it just checks the naming as per [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. If assumption is wrong (either topic prefix or suffix), tool becomes useless if aware even dangerous if not. Probably better either: * naming assumption should be optional and supply internal topics with argument (--internal-topics) * deletion could be optional (--no-delete-internal) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Murad M updated KAFKA-7930: --- Description: StreamsResetter deletes the topics considered internal. Currently it just checks the naming as per [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. If assumption is wrong (either topic prefix or suffix), tool becomes useless if aware even dangerous if not. Probably better either: * naming assumption should be optional and supply internal topics with argument (--internal-topics) * deletion could be optional (--no-delete-internal) Faced this, when was trying to reset applications with GlobalKTable topics named as *-changelog. Such topics sometimes are not desirable for deletion. was: StreamsResetter deletes the topics considered internal. Currently it just checks the naming as per [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. If assumption is wrong (either topic prefix or suffix), tool becomes useless if aware even dangerous if not. Probably better either: * naming assumption should be optional and supply internal topics with argument (--internal-topics) * deletion could be optional (--no-delete-internal) > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Murad M updated KAFKA-7930: --- Description: StreamsResetter deletes the topics considered internal. Currently it just checks the naming as per [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. If assumption is wrong (either topic prefix or suffix), tool becomes useless if aware even dangerous if not. Probably better either: * naming assumption should be optional and supply internal topics with argument (--internal-topics) * deletion could be optional (--no-delete-internal) * ignore topics which are included in list of --input-topics Faced this, when was trying to reset applications with GlobalKTable topics named as *-changelog. Such topics sometimes are not desirable for deletion. was: StreamsResetter deletes the topics considered internal. Currently it just checks the naming as per [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. If assumption is wrong (either topic prefix or suffix), tool becomes useless if aware even dangerous if not. Probably better either: * naming assumption should be optional and supply internal topics with argument (--internal-topics) * deletion could be optional (--no-delete-internal) Faced this, when was trying to reset applications with GlobalKTable topics named as *-changelog. Such topics sometimes are not desirable for deletion. > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767751#comment-16767751 ] Murad M commented on KAFKA-7930: Patch provided: https://github.com/muradm/kafka/pull/1 > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Murad M updated KAFKA-7930: --- Labels: features patch-available usability (was: features usability) > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7928) Deprecate WindowStore.put(key, value)
[ https://issues.apache.org/jira/browse/KAFKA-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7928: --- Labels: needs-kip (was: ) > Deprecate WindowStore.put(key, value) > - > > Key: KAFKA-7928 > URL: https://issues.apache.org/jira/browse/KAFKA-7928 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)` > This method is strange... A window store needs to have a timestamp associated > with the key, so if you do a put without a timestamp, it's up to the store to > just make one up. > Even the javadoc on the method recommends not to use it, due to this > confusing behavior. > We should just deprecate it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7928) Deprecate WindowStore.put(key, value)
[ https://issues.apache.org/jira/browse/KAFKA-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767754#comment-16767754 ] Matthias J. Sax commented on KAFKA-7928: +1 > Deprecate WindowStore.put(key, value) > - > > Key: KAFKA-7928 > URL: https://issues.apache.org/jira/browse/KAFKA-7928 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)` > This method is strange... A window store needs to have a timestamp associated > with the key, so if you do a put without a timestamp, it's up to the store to > just make one up. > Even the javadoc on the method recommends not to use it, due to this > confusing behavior. > We should just deprecate it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7928) Deprecate WindowStore.put(key, value)
[ https://issues.apache.org/jira/browse/KAFKA-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7928: --- Labels: beginner easy-fix needs-kip newbie (was: needs-kip) > Deprecate WindowStore.put(key, value) > - > > Key: KAFKA-7928 > URL: https://issues.apache.org/jira/browse/KAFKA-7928 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Priority: Major > Labels: beginner, easy-fix, needs-kip, newbie > > Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)` > This method is strange... A window store needs to have a timestamp associated > with the key, so if you do a put without a timestamp, it's up to the store to > just make one up. > Even the javadoc on the method recommends not to use it, due to this > confusing behavior. > We should just deprecate it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7930: --- Labels: features needs-kip patch-available usability (was: features patch-available usability) > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767760#comment-16767760 ] Matthias J. Sax commented on KAFKA-7930: Thanks for creating this ticket. I agree that naming assumptions could be optional. However, why would you want to make topic deletion optional? That whole purpose of the tool is to delete internal topics. For the last point: I think it kinda makes sense, however why would you have an input topic with name patter `-someName-repartition` or `-someName-changelog` ? {quote}Faced this, when was trying to reset applications with GlobalKTable topics named as *-changelog. Such topics sometimes are not desirable for deletion. {quote} If it's does not have `-` prefix it won't be deleted. Can you elaborate? > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767765#comment-16767765 ] Matthias J. Sax commented on KAFKA-7918: Judgment call in the end how much you put together into one PR – PR with more then 500 LOC getting hard to review and the turn around time increases. > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6460) Add mocks for state stores used in Streams unit testing
[ https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767479#comment-16767479 ] Guozhang Wang edited comment on KAFKA-6460 at 2/14/19 1:16 AM: --- Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update the description of the ticket as well). The main goal for this test utils is similar to KIP-267, i.e. to provide users during their development cycles a smooth iterations of trial-and-error experience, where they do not necessarily need to bring up a full fledged rocksDB instance, for example. So the scope of this task would be: 1) provide a mock store interface that can be used for both {{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the {{Materialized#as(StoreSupplier>)}} (the DSL layer). For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} inside {{streams/test-utils}} artifact, that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, and {{SessionStoreBuilder}} (for PAPI) as well as a {{StoreSupplier>}} (for DSL) provided by a store name, whose {{build}} and {{get}} calls will return a mock (this mock implementation only need to be the inner-most store, i.e. it can still be wrapped with metered / caching / logging, and it should be inside {{org.apache.kafka.streams.test.internals}} package so that they are not part of the public APIs). 2) The mock store implementations should expect it's {{init}} function to be called with a {{MockProcessorContext}} which includes recording all records forwarded via this context (e.g. changelogs). So the mock store implementation itself only need to keep track of its store function calls. 3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build}} / {{get}} calls are supposed to be only called once since they are expected to be used with {{TopologyTestDriver}} which does not create multiple topologies. Then users can get access the mocked store instance via {{TopologyTestDriver#getXXStore}} and then use their public APIs to query the store. 4) Additional to allow users to query the store directly, user's may want to also get how many function calls are triggered -- e.g. maybe the current store returns `2` for key `k`, but we also want to make sure it was because `put(k, 1)` and `put(k, 2)` are called. This can be provided by a public API like {{MockStoreFactory#putEntries(storeName)}} that returns a list of entries that are called via {{put}}. 5) For Streams' own unit tests, we can then refactor them to use this new mock store factory. For example, we can remove the internal {{org.apache.kafka.streams.state.KeyValueStoreTestDriver}} and use the above to refactor any unit tests related to this class -- one logic that is not yet supported in {{TopologyTestDriver}} / {{MockStoreFactory}} is store restoration, i.e. streams library may wan to pipe-in some records to the corresponding changelog first before starting the test driver, which will then be used to bootstrap the (possibly mocked) stores. This is not of interest to users, but streams' own unit testing need to cover. was (Author: guozhang): Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update the description of the ticket as well). The main goal for this test utils is similar to KIP-267, i.e. to provide users during their development cycles a smooth iterations of trial-and-error experience, where they do not necessarily need to bring up a full fledged rocksDB instance, for example. So the scope of this task would be: 1) provide a mock store interface that can be used for both {{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the {{Materialized#as(StoreSupplier>)}} (the DSL layer). For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} inside {{streams/test-utils}} artifact, that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, and {{SessionStoreBuilder}} (for PAPI) as well as a {{StoreSupplier>}} (for DSL) provided by a store name, whose {{build}} and {{get}} calls will return a mock (this mock implementation only need to be the inner-most store, i.e. it can still be wrapped with metered / caching / logging, and it should be inside {{org.apache.kafka.streams.test.internals}} package so that they are not part of the public APIs). 2) The mock store implementations should expect it's {{init}} function to be called with a {{MockProcessorContext}} which includes recording all records forwarded via this context (e.g. changelogs). So the mock store implementation itself only need to keep track of its store function calls. 3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build} / {{get}} calls are supposed to be only called once since they are expected to be used with {{TopologyTestDriver}} which does not cr
[jira] [Comment Edited] (KAFKA-6460) Add mocks for state stores used in Streams unit testing
[ https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767479#comment-16767479 ] Guozhang Wang edited comment on KAFKA-6460 at 2/14/19 1:21 AM: --- Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update the description of the ticket as well). The main goal for this test utils is similar to KIP-267, i.e. to provide users during their development cycles a smooth iterations of trial-and-error experience, where they do not necessarily need to bring up a full fledged rocksDB instance, for example. So the scope of this task would be: 1) provide a mock store interface that can be used for both {{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the {{Materialized#as(StoreSupplier>)}} (the DSL layer). For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} inside {{streams/test-utils}} artifact, that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, and {{SessionStoreBuilder}} (for PAPI) as well as a {{StoreSupplier>}} (for DSL) provided by a store name, whose {{build}} and {{get}} calls will return a mock (this mock implementation only need to be the inner-most store, i.e. it can still be wrapped with metered / caching / logging, and it should be inside {{org.apache.kafka.streams.test.internals}} package so that they are not part of the public APIs). 2) The mock store implementations should expect it's {{init}} function to be called with a {{MockProcessorContext}} which includes recording all records forwarded via this context (e.g. changelogs). So the mock store implementation itself only need to keep track of its store function calls. 3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build}} / {{get}} calls are supposed to be only called once since they are expected to be used with {{TopologyTestDriver}} which does not create multiple topologies. Then users can get access the mocked store instance via {{TopologyTestDriver#getXXStore}} and then use their public APIs to query the store. 4) Additional to allow users to query the store directly, user's may want to also get how many function calls are triggered -- e.g. maybe the current store returns `2` for key `k`, but we also want to make sure it was because `put(k, 1)` and `put(k, 2)` are called. This can be provided by a public API like {{MockStoreFactory#putEntries(storeName)}} that returns a list of entries that are called via {{put}}. 5) For Streams' own unit tests, we can then refactor them to use this new mock store factory. For example, we can remove the internal {{org.apache.kafka.streams.state.KeyValueStoreTestDriver}} and use the above to refactor any unit tests related to this class -- one logic that is not yet supported in {{TopologyTestDriver}} / {{MockStoreFactory}} is store restoration, i.e. streams library may wan to pipe-in some records to the corresponding changelog first before starting the test driver, which will then be used to bootstrap the (possibly mocked) stores. This is not of interest to users, but streams' own unit testing need to cover. EDIT: regarding 5), one thing I realized that restoration logic should be considered to be tested in Streams' own unit testing, but not really user's unit testing focuses. For users own testing, if they want to pre-populate some stores they can do that today by first getting the stores from the {{TopologyTestDriver}}, and then use {{put}} calls to insert some data into the stores first, and then call {{driver.pipeInput}} to pipe in some data into source topics. So maybe we would not need to replace, for example {{KeyValueStoreTestDriver}} with {{MockStateFactory}}, but I still think we can get rid of this class with the {{MockProcessorContext}} who has functionalities to keep track of forward calls etc. was (Author: guozhang): Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update the description of the ticket as well). The main goal for this test utils is similar to KIP-267, i.e. to provide users during their development cycles a smooth iterations of trial-and-error experience, where they do not necessarily need to bring up a full fledged rocksDB instance, for example. So the scope of this task would be: 1) provide a mock store interface that can be used for both {{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the {{Materialized#as(StoreSupplier>)}} (the DSL layer). For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} inside {{streams/test-utils}} artifact, that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, and {{SessionStoreBuilder}} (for PAPI) as well as a {{StoreSupplier>}} (for DSL) provided by a store name, whose {{build}} and {{get}} calls will return a mock (this mock implementat
[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing
[ https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767766#comment-16767766 ] Guozhang Wang commented on KAFKA-6460: -- [~mjsax] I'm not sure I can follow-up here, could you elaborate a bit more? > Add mocks for state stores used in Streams unit testing > --- > > Key: KAFKA-6460 > URL: https://issues.apache.org/jira/browse/KAFKA-6460 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > We'd like to use mocks for different types of state stores: kv, window, > session that can be used to record the number of expected put / get calls > used in the DSL operator unit testing. This involves implementing the two > interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object > created from, say, EasyMock, and the object can then be set up with the > expected calls. > In addition, we should also add a mock record collector which can be returned > from the mock processor context so that with logging enabled store, users can > also validate if the changes have been forwarded to the changelog as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7931) Java Client: if all ephemeral brokers fail, client can never reconnect to brokers
Brian created KAFKA-7931: Summary: Java Client: if all ephemeral brokers fail, client can never reconnect to brokers Key: KAFKA-7931 URL: https://issues.apache.org/jira/browse/KAFKA-7931 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.1.0 Reporter: Brian Steps to reproduce: * Setup kafka cluster in GKE, with bootstrap server address configured to point to a load balancer that exposes all GKE nodes * Run producer that emits values into a partition with 3 replicas * Kill every broker in the cluster * Wait for brokers to restart Observed result: The java client cannot find any of the nodes even though they have all recovered. I see messages like "Connection to node 30 (/10.6.0.101:9092) could not be established. Broker may not be available.". Note, this is *not* a duplicate of https://issues.apache.org/jira/browse/KAFKA-7890. I'm using the client version that contains the fix for https://issues.apache.org/jira/browse/KAFKA-7890. Versions: Kakfa: kafka version 2.1.0, using confluentinc/cp-kafka/5.1.0 docker image Client: trunk from a few days ago (git sha 9f7e6b291309286e3e3c1610e98d978773c9d504), to pull in the fix for KAFKA-7890 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing
[ https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767791#comment-16767791 ] Matthias J. Sax commented on KAFKA-6460: Internally, the DSL always uses the class `Stores` to create state store. Thus, if one write code like {quote}{{stream.groupByKey().aggregate(...)}} {quote} and want to test the aggregator by inspecting the store, one want to inject a mocked store. However, to do this, the code must be re-written to {quote}{{stream.groupByKey().aggregate(..., Materialized.as(MockStoreFactory.mockedKeyValueStore(...));}}{quote} This implies that production code and test code is not the same. For proper testing, it should be possible to test the original code directly without rewriting the code. The current `TopologyTestDriver` also works that way – it takes an unmodified `Topology` that can either be handed to `KafkaStream` to actually run it, or to the driver to test it. It's not necessary to rewrite the code that assembles the `Topology` in order to test it. For mocked stores, it should work the same way. Does it make sense? > Add mocks for state stores used in Streams unit testing > --- > > Key: KAFKA-6460 > URL: https://issues.apache.org/jira/browse/KAFKA-6460 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > We'd like to use mocks for different types of state stores: kv, window, > session that can be used to record the number of expected put / get calls > used in the DSL operator unit testing. This involves implementing the two > interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object > created from, say, EasyMock, and the object can then be set up with the > expected calls. > In addition, we should also add a mock record collector which can be returned > from the mock processor context so that with logging enabled store, users can > also validate if the changes have been forwarded to the changelog as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases
[ https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767814#comment-16767814 ] huxihx commented on KAFKA-7794: --- There are some inconsistencies here where GetOffsetShell wants to seek the largest offset before the given timestamp, but ListOffsetRequest retrieves the smallest offset after the timestamp (See comments of method `fetchOffsetByTimestamp` in Log.scala). That's why you'll get all the log start offsets if specifying a very old timestamp, but you get nothing when a future timestamp is given. A naive solution is to correct the description of `–time` for GetOffsetShell :) > kafka.tools.GetOffsetShell does not return the offset in some cases > --- > > Key: KAFKA-7794 > URL: https://issues.apache.org/jira/browse/KAFKA-7794 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: Daniele Ascione >Assignee: Kartik >Priority: Critical > Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, > shell-script, shellscript, tools, usability > Attachments: image-2019-02-11-20-51-07-805.png, > image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, > image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, > image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, > image-2019-02-13-11-43-28-873.png, image-2019-02-13-11-44-18-736.png, > image-2019-02-13-11-45-21-459.png > > > For some input for the timestamps (different from -1 or -2) the GetOffset is > not able to retrieve the offset. > For example, if _x_ is the timestamp in that "not working range", and you > execute: > {code:java} > bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --time x > {code} > The output is: > {code:java} > MY_TOPIC:8: > MY_TOPIC:2: > MY_TOPIC:5: > MY_TOPIC:4: > MY_TOPIC:7: > MY_TOPIC:1: > MY_TOPIC:9:{code} > while after the last ":" an integer representing the offset is expected. > > Steps to reproduce it: > # Consume all the messages from the beginning and print the timestamp: > {code:java} > bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true > > messages{code} > # Sort the messages by timestamp and get some of the oldest messages: > {code:java} > awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code} > # Take (for example) the timestamp of the 10th oldest message, and see if > GetOffsetShell is not able to print the offset: > {code:java} > timestamp="$(sed '10q;d' msg_sorted | cut -f1)" > bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp > # The output should be something like: > # MY_TOPIC:1: > # MY_TOPIC:2: > (repeated for every partition){code} > # Verify that the message with that timestamp is still in Kafka: > {code:java} > bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list > $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep > "CreateTime:$timestamp" {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767880#comment-16767880 ] Murad M commented on KAFKA-7930: Historically those topics was auto-created as per GlobalKTable naming pattern. Now they are being used for different purposes like cached lookup, which are populated from various other places. And they effectively became "input" topics represented as GlobalKTable. While there are mechanisms to re-populate them, it does not necessarily mean that it has to be done every time application being reset. For application reset it is enough to shift offsets to earliest, so that GlobalKTable will materialize again from existing topic. It is true that, from replay point of view, we would achieve different results, but same is true for any other topic with 'cleanup.policy=compact,delete'. Some use-cases: - static / managed configuration data (routes, flags etc.) - caches pulled from external systems - caches based on pushed from external systems > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767880#comment-16767880 ] Murad M edited comment on KAFKA-7930 at 2/14/19 5:14 AM: - Historically those topics was auto-created as per GlobalKTable naming pattern. Now they are being used for different purposes like cached lookup, which are populated from various other places. And they effectively became "input" topics represented as GlobalKTable. While there are mechanisms to re-populate them, it does not necessarily mean that it has to be done every time application being reset. For application reset it is enough to shift offsets to earliest, so that GlobalKTable will materialize again from existing topic. It is true that, from replay point of view, we would achieve different results, but same is true for any other topic with 'cleanup.policy=compact,delete'. Some use-cases: - static / managed configuration data (routes, flags etc.) - caches pulled from external systems - caches based on pushed from external systems - the other case is sequence of services, where first service builds state represented by GlobalKTable from event stream, and number of other applications leveraging from built state. So for first application, it is "internal" deleteable topic, while for rest applications it is "input" topic, but attempt to reset any of services using that topic as "input", ends in loosing topic, so all services in sequence has to be reset. Once we hit that limitation, first option was to get rid of historical naming convention, but that is not an option either, as there is no such thing as "renaming" topics. It is possible to "copy" data with tool like mirror-maker, but that is whole different story. was (Author: muradm): Historically those topics was auto-created as per GlobalKTable naming pattern. Now they are being used for different purposes like cached lookup, which are populated from various other places. And they effectively became "input" topics represented as GlobalKTable. While there are mechanisms to re-populate them, it does not necessarily mean that it has to be done every time application being reset. For application reset it is enough to shift offsets to earliest, so that GlobalKTable will materialize again from existing topic. It is true that, from replay point of view, we would achieve different results, but same is true for any other topic with 'cleanup.policy=compact,delete'. Some use-cases: - static / managed configuration data (routes, flags etc.) - caches pulled from external systems - caches based on pushed from external systems > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767880#comment-16767880 ] Murad M edited comment on KAFKA-7930 at 2/14/19 5:14 AM: - Historically those topics was auto-created as per GlobalKTable naming pattern. Now they are being used for different purposes like cached lookup, which are populated from various other places. And they effectively became "input" topics represented as GlobalKTable. While there are mechanisms to re-populate them, it does not necessarily mean that it has to be done every time application being reset. For application reset it is enough to shift offsets to earliest, so that GlobalKTable will materialize again from existing topic. It is true that, from replay point of view, we would achieve different results, but same is true for any other topic with 'cleanup.policy=compact,delete'. Some use-cases: - static / managed configuration data (routes, flags etc.) - caches pulled from external systems - caches based on pushed from external systems - the other case is sequence of services, where first service builds state represented by GlobalKTable from event stream, and number of other applications leveraging from built state. So for first application, it is "internal" deleteable topic, while for rest applications it is "input" topic, but attempt to reset any of services using that topic as "input", ends in loosing topic, so all services in sequence has to be reset. Once we hit that limitation, first option was to get rid of historical naming convention, but that is not an option either, as there is no such thing as "renaming" topics. It is possible to "copy" data with tool like mirror-maker, but that is whole different story. was (Author: muradm): Historically those topics was auto-created as per GlobalKTable naming pattern. Now they are being used for different purposes like cached lookup, which are populated from various other places. And they effectively became "input" topics represented as GlobalKTable. While there are mechanisms to re-populate them, it does not necessarily mean that it has to be done every time application being reset. For application reset it is enough to shift offsets to earliest, so that GlobalKTable will materialize again from existing topic. It is true that, from replay point of view, we would achieve different results, but same is true for any other topic with 'cleanup.policy=compact,delete'. Some use-cases: - static / managed configuration data (routes, flags etc.) - caches pulled from external systems - caches based on pushed from external systems - the other case is sequence of services, where first service builds state represented by GlobalKTable from event stream, and number of other applications leveraging from built state. So for first application, it is "internal" deleteable topic, while for rest applications it is "input" topic, but attempt to reset any of services using that topic as "input", ends in loosing topic, so all services in sequence has to be reset. Once we hit that limitation, first option was to get rid of historical naming convention, but that is not an option either, as there is no such thing as "renaming" topics. It is possible to "copy" data with tool like mirror-maker, but that is whole different story. > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767890#comment-16767890 ] Guozhang Wang commented on KAFKA-7930: -- {{GlobalKTable}} should not have changelog topics since it always use the defined source topic as changelog to materialize itself, right? > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767895#comment-16767895 ] Murad M commented on KAFKA-7930: Yes, if you create GlobalKTable, it is created from provided topic name. How we come up with `--changelog` I don't remember. That was years back since 0.8.x-0.9.x times. Probably it was KTable's generated topic. > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing
[ https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767899#comment-16767899 ] Guozhang Wang commented on KAFKA-6460: -- Ah yes, that's a good point. Thinking about this again, how about injecting the mocking logic into {{TopologyTestDriver}} so that users do not need to make any code changes, while the cost is to push the code complexity burden on Streams (i.e. inside {{TopologyTestDriver}}). More specifically: When {{TopologyTestDriver}} takes in the {{InternalTopologyBuilder}} as parameter, loop over its {{stateFactories}} and {{globalStateBuilders}} map, and replace each entry (StateStoreFactory's embedded {{StoreBuilder}}, and {{StoreBuilder}} respectively) with the mock store builder by checking its type, e.g. {{KeyValueStoreBuilder}} -> {{MockKeyValueStoreBuilder}}, etc. So that for both PAPI and DSL, user code does not need to change at all: 1) PAPI users in production / testing code: {{Topology#addStateStore(Stores.keyValueStoreBuilder(...))}}. 2) DSL users in production code, without materialization spec: {{aggregate()}} 2) DSL users in production code, with materialization spec: {{aggregate(Materialized)}} Then the only additional API we need is to allow users to check the number of function calls for a given store with {{TopologyTestDriver#putEntries(storeName)}}. WDYT? > Add mocks for state stores used in Streams unit testing > --- > > Key: KAFKA-6460 > URL: https://issues.apache.org/jira/browse/KAFKA-6460 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > We'd like to use mocks for different types of state stores: kv, window, > session that can be used to record the number of expected put / get calls > used in the DSL operator unit testing. This involves implementing the two > interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object > created from, say, EasyMock, and the object can then be set up with the > expected calls. > In addition, we should also add a mock record collector which can be returned > from the mock processor context so that with logging enabled store, users can > also validate if the changes have been forwarded to the changelog as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767751#comment-16767751 ] Murad M edited comment on KAFKA-7930 at 2/14/19 6:36 AM: - Patch provided: https://github.com/apache/kafka/pull/6267 was (Author: muradm): Patch provided: https://github.com/muradm/kafka/pull/1 > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767905#comment-16767905 ] ASF GitHub Bot commented on KAFKA-7930: --- muradm commented on pull request #6267: KAFKA-7930: topic is not internal if explicitly listed in args URL: https://github.com/apache/kafka/pull/6267 Simplest fix: topic is not internal if explicitly listed in --input-topics or --intermediate-topics. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767880#comment-16767880 ] Murad M edited comment on KAFKA-7930 at 2/14/19 6:53 AM: - Historically those topics was auto-created as per GlobalKTable naming pattern. Now they are being used for different purposes like cached lookup, which are populated from various other places. And they effectively became "input" topics represented as GlobalKTable. While there are mechanisms to re-populate them, it does not necessarily mean that it has to be done every time application being reset. For application reset it is enough to shift offsets to earliest, so that GlobalKTable will materialize again from existing topic. It is true that, from replay point of view, we would achieve different results, but same is true for any other topic with 'cleanup.policy=compact,delete'. Some use-cases: - static / managed configuration data (routes, flags etc.) - caches pulled from external systems - caches based on pushed from external systems - the other case is sequence of services, where first service builds state represented by GlobalKTable from event stream, and number of other applications leveraging from built state. So for first application, it is "internal" deleteable topic, while for rest applications it is "input" topic, but attempt to reset any of services using that topic as "input", ends in loosing topic, so all services in sequence has to be reset. This actually happens when application ids overlap, like first application id is 'session-manager-adapter' and the other is 'session-manager', then first application id based criteria satisfies for both applications. Once we hit that limitation, first option was to get rid of historical naming convention, but that is not an option either, as there is no such thing as "renaming" topics. It is possible to "copy" data with tool like mirror-maker, but that is whole different story. was (Author: muradm): Historically those topics was auto-created as per GlobalKTable naming pattern. Now they are being used for different purposes like cached lookup, which are populated from various other places. And they effectively became "input" topics represented as GlobalKTable. While there are mechanisms to re-populate them, it does not necessarily mean that it has to be done every time application being reset. For application reset it is enough to shift offsets to earliest, so that GlobalKTable will materialize again from existing topic. It is true that, from replay point of view, we would achieve different results, but same is true for any other topic with 'cleanup.policy=compact,delete'. Some use-cases: - static / managed configuration data (routes, flags etc.) - caches pulled from external systems - caches based on pushed from external systems - the other case is sequence of services, where first service builds state represented by GlobalKTable from event stream, and number of other applications leveraging from built state. So for first application, it is "internal" deleteable topic, while for rest applications it is "input" topic, but attempt to reset any of services using that topic as "input", ends in loosing topic, so all services in sequence has to be reset. Once we hit that limitation, first option was to get rid of historical naming convention, but that is not an option either, as there is no such thing as "renaming" topics. It is possible to "copy" data with tool like mirror-maker, but that is whole different story. > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing
[ https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767930#comment-16767930 ] Matthias J. Sax commented on KAFKA-6460: I did not think about how to make it work. But what you suggests SGTM – in fact, `TopologyTestDriver` uses `InternalTopologyBuilder` already. Overall (from experience) this seem to be a tricky ticket – thus, I would like to have a PR in parallel to the KIP – otherwise, it will be hard to get right. [~shung] How does this sound to you? > Add mocks for state stores used in Streams unit testing > --- > > Key: KAFKA-6460 > URL: https://issues.apache.org/jira/browse/KAFKA-6460 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > We'd like to use mocks for different types of state stores: kv, window, > session that can be used to record the number of expected put / get calls > used in the DSL operator unit testing. This involves implementing the two > interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object > created from, say, EasyMock, and the object can then be set up with the > expected calls. > In addition, we should also add a mock record collector which can be returned > from the mock processor context so that with logging enabled store, users can > also validate if the changes have been forwarded to the changelog as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767936#comment-16767936 ] Matthias J. Sax commented on KAFKA-7930: {quote}Now they are being used for different purposes like cached lookup, which are populated from various other places. And they effectively became "input" topics represented as GlobalKTable. {quote} This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote}For application reset it is enough to shift offsets to earliest, {quote} For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767939#comment-16767939 ] Murad M commented on KAFKA-7930: {quote} This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote} That is the point, if there would be some internal flag that would identify that this topic is internal, then ok. But now it treats that topic as internal by pattern, hoping that it was generated correctly in advance, and user did not come up with arbitrary topic name that accidentally matches this pattern or changed the intention of previously name auto-generated topic. On the other hand, currently if you specify such topic in `--input-topics` or `--intermediate-topics`, then tool will first treat it as specified (apply the offset reset to latest/earliest) and then because it is also `isInternalTopic` will delete it. Which is somewhat weird. Also, delete is fast and irreversible... Should not be that easy I think.. {quote} For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? {quote} Possible, and was considered, but we had to: a) read the code of both tools to make sure that they are doing same thing, because no documentation states that they are interchangeable. b) `kafka-consumer-group.sh` works on per topic basis, while `kafka-streams-application-reset.sh` works on multiple topics, which is more convenient. c) `kafka-streams-application-reset.sh` is intended tool for this, while `kafka-consumer-group.sh` seems like more "low level" and may work outside of context of streams. Somehow, it was cheaper / faster patch `StreamsResetter` than use `kafka-consumer-group.sh`. > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which are included in list of --input-topics > Faced this, when was trying to reset applications with GlobalKTable topics > named as *-changelog. Such topics sometimes are not desirable for deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767939#comment-16767939 ] Murad M edited comment on KAFKA-7930 at 2/14/19 7:49 AM: - {quote} This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote} That is the point, if there would be some internal flag that would identify that this topic is internal, then ok. But now it treats that topic as internal by pattern, hoping that it was generated correctly in advance, and user did not come up with arbitrary topic name that accidentally matches this pattern or changed the intention of previously name auto-generated topic. On the other hand, currently if you specify such topic in `\-\-input-topics` or `--intermediate-topics`, then tool will first treat it as specified (apply the offset reset to latest/earliest) and then because it is also `isInternalTopic` will delete it. Which is somewhat weird. Also, delete is fast and irreversible... Should not be that easy I think.. {quote} For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? {quote} Possible, and was considered, but we had to: a) read the code of both tools to make sure that they are doing same thing, because no documentation states that they are interchangeable. b) `kafka-consumer-group.sh` works on per topic basis, while `kafka-streams-application-reset.sh` works on multiple topics, which is more convenient. c) `kafka-streams-application-reset.sh` is intended tool for this, while `kafka-consumer-group.sh` seems like more "low level" and may work outside of context of streams. Somehow, it was cheaper / faster patch `StreamsResetter` than use `kafka-consumer-group.sh`. was (Author: muradm): {quote} This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote} That is the point, if there would be some internal flag that would identify that this topic is internal, then ok. But now it treats that topic as internal by pattern, hoping that it was generated correctly in advance, and user did not come up with arbitrary topic name that accidentally matches this pattern or changed the intention of previously name auto-generated topic. On the other hand, currently if you specify such topic in `--input-topics` or `--intermediate-topics`, then tool will first treat it as specified (apply the offset reset to latest/earliest) and then because it is also `isInternalTopic` will delete it. Which is somewhat weird. Also, delete is fast and irreversible... Should not be that easy I think.. {quote} For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? {quote} Possible, and was considered, but we had to: a) read the code of both tools to make sure that they are doing same thing, because no documentation states that they are interchangeable. b) `kafka-consumer-group.sh` works on per topic basis, while `kafka-streams-application-reset.sh` works on multiple topics, which is more convenient. c) `kafka-streams-application-reset.sh` is intended tool for this, while `kafka-consumer-group.sh` seems like more "low level" and may work outside of context of streams. Somehow, it was cheaper / faster patch `StreamsResetter` than use `kafka-consumer-group.sh`. > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics whic
[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767939#comment-16767939 ] Murad M edited comment on KAFKA-7930 at 2/14/19 7:50 AM: - {quote}This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote} That is the point, if there would be some internal flag that would identify that this topic is internal, then ok. But now it treats that topic as internal by pattern, hoping that it was generated correctly in advance, and user did not come up with arbitrary topic name that accidentally matches this pattern or changed the intention of previously name auto-generated topic. On the other hand, currently if you specify such topic in `--input-topics` or `--intermediate-topics`, then tool will first treat it as specified (apply the offset reset to latest/earliest) and then because it is also `isInternalTopic` will delete it. Which is somewhat weird. Also, delete is fast and irreversible... Should not be that easy I think.. {quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? {quote} Possible, and was considered, but: a) we had to read the code of both tools to make sure that they are doing same thing, because no documentation states that they are interchangeable. b) `kafka-consumer-group.sh` works on per topic basis, while `kafka-streams-application-reset.sh` works on multiple topics, which is more convenient. c) `kafka-streams-application-reset.sh` is intended tool for this, while `kafka-consumer-group.sh` seems like more "low level" and may work outside of context of streams. Somehow, it was cheaper / faster patch `StreamsResetter` than use `kafka-consumer-group.sh`. was (Author: muradm): {quote} This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote} That is the point, if there would be some internal flag that would identify that this topic is internal, then ok. But now it treats that topic as internal by pattern, hoping that it was generated correctly in advance, and user did not come up with arbitrary topic name that accidentally matches this pattern or changed the intention of previously name auto-generated topic. On the other hand, currently if you specify such topic in `\-\-input-topics` or `--intermediate-topics`, then tool will first treat it as specified (apply the offset reset to latest/earliest) and then because it is also `isInternalTopic` will delete it. Which is somewhat weird. Also, delete is fast and irreversible... Should not be that easy I think.. {quote} For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? {quote} Possible, and was considered, but we had to: a) read the code of both tools to make sure that they are doing same thing, because no documentation states that they are interchangeable. b) `kafka-consumer-group.sh` works on per topic basis, while `kafka-streams-application-reset.sh` works on multiple topics, which is more convenient. c) `kafka-streams-application-reset.sh` is intended tool for this, while `kafka-consumer-group.sh` seems like more "low level" and may work outside of context of streams. Somehow, it was cheaper / faster patch `StreamsResetter` than use `kafka-consumer-group.sh`. > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which
[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767939#comment-16767939 ] Murad M edited comment on KAFKA-7930 at 2/14/19 7:51 AM: - {quote}This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote} That is the point, if there would be some internal flag that would identify that this topic is internal, then ok. But now it treats that topic as internal by pattern, hoping that it was generated correctly in advance, and user did not come up with arbitrary topic name that accidentally matches this pattern or changed the intention of previously name auto-generated topic. On the other hand, currently if you specify such topic in `\-\-input-topics` or `--intermediate-topics`, then tool will first treat it as specified (apply the offset reset to latest/earliest) and then because it is also `isInternalTopic` will delete it. Which is somewhat weird and misleading. Also, delete is fast and irreversible... Should not be that easy I think.. {quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? {quote} Possible, and was considered, but: a) we had to read the code of both tools to make sure that they are doing same thing, because no documentation states that they are interchangeable. b) `kafka-consumer-group.sh` works on per topic basis, while `kafka-streams-application-reset.sh` works on multiple topics, which is more convenient. c) `kafka-streams-application-reset.sh` is intended tool for this, while `kafka-consumer-group.sh` seems like more "low level" and may work outside of context of streams. Somehow, it was cheaper / faster patch `StreamsResetter` than use `kafka-consumer-group.sh`. was (Author: muradm): {quote}This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote} That is the point, if there would be some internal flag that would identify that this topic is internal, then ok. But now it treats that topic as internal by pattern, hoping that it was generated correctly in advance, and user did not come up with arbitrary topic name that accidentally matches this pattern or changed the intention of previously name auto-generated topic. On the other hand, currently if you specify such topic in `\-\-input-topics` or `--intermediate-topics`, then tool will first treat it as specified (apply the offset reset to latest/earliest) and then because it is also `isInternalTopic` will delete it. Which is somewhat weird. Also, delete is fast and irreversible... Should not be that easy I think.. {quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? {quote} Possible, and was considered, but: a) we had to read the code of both tools to make sure that they are doing same thing, because no documentation states that they are interchangeable. b) `kafka-consumer-group.sh` works on per topic basis, while `kafka-streams-application-reset.sh` works on multiple topics, which is more convenient. c) `kafka-streams-application-reset.sh` is intended tool for this, while `kafka-consumer-group.sh` seems like more "low level" and may work outside of context of streams. Somehow, it was cheaper / faster patch `StreamsResetter` than use `kafka-consumer-group.sh`. > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignor
[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions
[ https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767939#comment-16767939 ] Murad M edited comment on KAFKA-7930 at 2/14/19 7:50 AM: - {quote}This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote} That is the point, if there would be some internal flag that would identify that this topic is internal, then ok. But now it treats that topic as internal by pattern, hoping that it was generated correctly in advance, and user did not come up with arbitrary topic name that accidentally matches this pattern or changed the intention of previously name auto-generated topic. On the other hand, currently if you specify such topic in `\-\-input-topics` or `--intermediate-topics`, then tool will first treat it as specified (apply the offset reset to latest/earliest) and then because it is also `isInternalTopic` will delete it. Which is somewhat weird. Also, delete is fast and irreversible... Should not be that easy I think.. {quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? {quote} Possible, and was considered, but: a) we had to read the code of both tools to make sure that they are doing same thing, because no documentation states that they are interchangeable. b) `kafka-consumer-group.sh` works on per topic basis, while `kafka-streams-application-reset.sh` works on multiple topics, which is more convenient. c) `kafka-streams-application-reset.sh` is intended tool for this, while `kafka-consumer-group.sh` seems like more "low level" and may work outside of context of streams. Somehow, it was cheaper / faster patch `StreamsResetter` than use `kafka-consumer-group.sh`. was (Author: muradm): {quote}This should not have happened... Note that, all internal topic should not be read/written by any other application. If you want to share data, you should always create your own topics and write to them via `to()` or `through()`. {quote} That is the point, if there would be some internal flag that would identify that this topic is internal, then ok. But now it treats that topic as internal by pattern, hoping that it was generated correctly in advance, and user did not come up with arbitrary topic name that accidentally matches this pattern or changed the intention of previously name auto-generated topic. On the other hand, currently if you specify such topic in `--input-topics` or `--intermediate-topics`, then tool will first treat it as specified (apply the offset reset to latest/earliest) and then because it is also `isInternalTopic` will delete it. Which is somewhat weird. Also, delete is fast and irreversible... Should not be that easy I think.. {quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of stream reset tool. Thus, I am wondering if we should patch the reset tool at all, as there is an alternative tool you can use? {quote} Possible, and was considered, but: a) we had to read the code of both tools to make sure that they are doing same thing, because no documentation states that they are interchangeable. b) `kafka-consumer-group.sh` works on per topic basis, while `kafka-streams-application-reset.sh` works on multiple topics, which is more convenient. c) `kafka-streams-application-reset.sh` is intended tool for this, while `kafka-consumer-group.sh` seems like more "low level" and may work outside of context of streams. Somehow, it was cheaper / faster patch `StreamsResetter` than use `kafka-consumer-group.sh`. > StreamsResetter makes "changelog" topic naming assumptions > -- > > Key: KAFKA-7930 > URL: https://issues.apache.org/jira/browse/KAFKA-7930 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Affects Versions: 2.1.0 >Reporter: Murad M >Priority: Major > Labels: features, needs-kip, patch-available, usability > > StreamsResetter deletes the topics considered internal. Currently it just > checks the naming as per > [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660]. > If assumption is wrong (either topic prefix or suffix), tool becomes useless > if aware even dangerous if not. Probably better either: > * naming assumption should be optional and supply internal topics with > argument (--internal-topics) > * deletion could be optional (--no-delete-internal) > * ignore topics which ar