[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts
[ https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853118#comment-16853118 ] Mateusz Owczarek commented on KAFKA-7994: - Bruno's example above indicates it actually does affect `suppress` operator in terms of producing only 'final window result'. For me the keyword 'final' suggests that one and only one result will be produced, whereas one simple rebalancing may cause multiple results of the same window. Only recently I've been using `suppress` operator in my application and found out this behavior, so I've created a pull request for my Kafka Streams example repository describing the steps to simply reproduce multiple `suppress` results: https://github.com/mowczare/kafka-streams-scala/pull/1 > Improve Stream-Time for rebalances and restarts > --- > > Key: KAFKA-7994 > URL: https://issues.apache.org/jira/browse/KAFKA-7994 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Richard Yu >Priority: Major > Attachments: possible-patch.diff > > > We compute a per-partition partition-time as the maximum timestamp over all > records processed so far. Furthermore, we use partition-time to compute > stream-time for each task as maximum over all partition-times (for all > corresponding task partitions). This stream-time is used to make decisions > about processing out-of-order records or drop them if they are late (ie, > timestamp < stream-time - grace-period). > During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, > -1) for tasks that are newly created (or migrated). In net effect, we forget > current stream-time for this case what may lead to non-deterministic behavior > if we stop processing right before a late record, that would be dropped if we > continue processing, but is not dropped after rebalance/restart. Let's look > at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and > the following records (timestamps in parenthesis): > > {code:java} > r1(0) r2(5) r3(11) r4(2){code} > In the example, stream-time advances as 0, 5, 11, 11 and thus record `r4` is > dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or > rebalance after processing `r3` but before processing `r4`, we would > reinitialize stream-time as -1, and thus would process `r4` on restart/after > rebalance. The problem is, that stream-time does advance differently from a > global point of view: 0, 5, 11, 2. > Note, this is a corner case, because if we would stop processing one record > earlier, ie, after processing `r2` but before processing `r3`, stream-time > would be advance correctly from a global point of view. > A potential fix would be, to store latest observed partition-time in the > metadata of committed offsets. Thus way, on restart/rebalance we can > re-initialize time correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842015#comment-16842015 ] Mateusz Owczarek commented on KAFKA-5998: - [~guozhang] Any updates on this? I have the same issues with Kafka 2.1.1. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769611#comment-16769611 ] Mateusz Owczarek commented on KAFKA-7882: - Late record as I understand (correct me if I'm wrong) is between the window duration end time and window grace period end time. After that I reject all the late events. > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769321#comment-16769321 ] Mateusz Owczarek commented on KAFKA-7882: - I am using this approach only after window aggregation step so that all the input events are windowed. Final output event is produced after window duration + window grace period (the check if the time has expired is repeated multiple times using wall-clock punctuation). After that, I clear my state store accordingly. In this way information is not kept forever and max 1 event per window is emitted. Am I right? > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768597#comment-16768597 ] Mateusz Owczarek commented on KAFKA-7882: - I understand that, I'm familiar with the wall-clock vs event time basic concepts. I presume there may be latencies and wall-clock punctuation may not produce any events in a window. I'm accepting this risk. What I was asking is why it is not guaranteed that max one event per window is to be published while using wall-clock time on my suppress transformer. This is crucial for me and 2.1.0 suppress implementation does not provide it, presumably, with emitEarlyWhenFull approach. > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768064#comment-16768064 ] Mateusz Owczarek commented on KAFKA-7882: - Yes, we can, but there is still one unanswered question (maybe not related to the subject of the issue, but forgive me, I'm very curious): {quote}_Offhand, if you were previously using wall-clock time, you don't need semantically airtight suppression behavior, so, emitting early when the buffer fills up should be fine._ {quote} _As far as I understand, emitEarlyWhenFull does not guarantee one event per window so it's not sufficient for me. Could you elaborate on why any wall-clock time implementation does not ensure it either?_ > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767006#comment-16767006 ] Mateusz Owczarek commented on KAFKA-7882: - [~mjsax] As I reported, I was using Kafka Streams 2.0.0 with Scala DSL API, where transform method accepts Transformer instance (not TransformerSupplier) as a parameter: {code} def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], stateStoreNames: String*): KStream[K1, V1] = { val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new TransformerSupplier[K, V, KeyValue[K1, V1]] { override def get(): Transformer[K, V, KeyValue[K1, V1]] = { new Transformer[K, V, KeyValue[K1, V1]] { override def transform(key: K, value: V): KeyValue[K1, V1] = { transformer.transform(key, value) match { case (k1, v1) => KeyValue.pair(k1, v1) case _ => null } } override def init(context: ProcessorContext): Unit = transformer.init(context) override def close(): Unit = transformer.close() } } } inner.transform(transformerSupplierJ, stateStoreNames: _*) } {code} I believe the implementation changed now in 2.1.0 and does actually accept TransformerSupplier. > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765967#comment-16765967 ] Mateusz Owczarek commented on KAFKA-7882: - [~vvcephei] This is pretty much, how do I call suppress method: {code:java} .suppress(new FinalResultsSuppressionBuilder( stateStoreName, new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SPILL_TO_DISK) )) {code} As far as I can see using static untilWindowCloses method also requires StrictBufferConfig and StrictBufferConfigImpl is a public class. Maybe it should not be? I presume I should have used withMaxBytes or unbounded static methods from the BufferConfig interface. I understand that the way I used the suppress method, fulfilled buffer will eventually throw an unexpected exception and stop my application when the buffer is full? bq. Offhand, if you were previously using wall-clock time, you don't need semantically airtight suppression behavior, so, emitting early when the buffer fills up should be fine. As far as I understand, emitEarlyWhenFull does not guarantee one event per window so it's not sufficient for me. Could you elaborate on why any wall-clock time implementation does not ensure it either? > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764999#comment-16764999 ] Mateusz Owczarek commented on KAFKA-7882: - Sorry for bumping post, but any advice on creating test case for topology containing `.suppress` method would be very helpful :) > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761758#comment-16761758 ] Mateusz Owczarek edited comment on KAFKA-7882 at 2/11/19 2:16 PM: -- Also, I noticed my regression tests checking if events are actually produced after the time-window has finished are not passing. They are based on the `TopologyTestDriver` with the `advanceWallClockTime` since my previous suppress impl. was based on wall clock time punctuation. How can I test my topology now? Basically how can I test any topology containing `.suppress` method from Kafka 2.1.0? was (Author: nijo): Also, I noticed my regression tests checking if events are actually produced after the time-window has finished are not passing. They are based on the `TopologyTestDriver` with the `advanceWallClockTime` since my previous suppress impl. was based on wall clock time punctuation. How can I test my topology now? > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761758#comment-16761758 ] Mateusz Owczarek commented on KAFKA-7882: - Also, I noticed my regression tests checking if events are actually produced after the time-window has finished are not passing. They are based on the `TopologyTestDriver` with the `advanceWallClockTime` since my previous suppress impl. was based on wall clock time punctuation. How can I test my topology now? > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761706#comment-16761706 ] Mateusz Owczarek commented on KAFKA-7882: - Hey [~vvcephei], thanks for the answer! I've upgraded Kafka to 2.1.0 and migrated my code to use your suppress method and it looks like it will fit in my code nicely. However, I'm a bit concerned with the lack of documentation on the `BufferFullStrategy` class. It seems, at a first glance, I can choose `SPILL_TO_DISK` option being (afaik) default state store impl. based on RocksDB on any stateful operations (aggregates, counts and so on). Does it work like this in this case? Should I be worried that my application will shutdown if my memory would be overfilled? > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mateusz Owczarek updated KAFKA-7882: Description: Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation: {code:java} override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { val partition = context.partition() if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) else logger.warn(s"-1 partition") null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback } {code} What I do get is the following error: {code:java} Store MyStore is currently closed{code} This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation. If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however, I haven't found anything that could help me here. I build my state store like this: {code:java} val stateStore = Stores.windowStoreBuilder( Stores.persistentWindowStore( stateStoreName, timeWindows.maintainMs() + timeWindows.sizeMs + TimeUnit.DAYS.toMillis(1), timeWindows.segments, timeWindows.sizeMs, false ), serde[K], serde[(V, Int)] ) {code} and include it in a DSL API like this: {code:java} builder.addStateStore(stateStore) (...).transform(new MyTransformer(...), "MyStore") {code} INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works. Edit: May this be because of the fact that the {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low value? If I understand correctly, spilling to disk is done therefore more frequently, may it, therefore, cause closing the store? was: Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation: {code:java} override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { val partition = context.partition() if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) else logger.warn(s"-1 partition") null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback } {code} What I do get is the following error: {code:java} Store MyStore is currently closed{code} This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation. If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however, I haven't found anything that could help me here. I build my state store like this: {code:java} val stateStore = Stores.windowStoreBuilder( Stores.persistentWindowStore( stateStoreName, timeWindows.maintainMs() + timeWindows.sizeMs + TimeUnit.DAYS.toMillis(1), timeWindows.segments, timeWindows.sizeMs, false ), serde[K], serde[(V, Int)] ) {code} and include it in a DSL API like this: {code:java} builder.addStateStore(stateStore) (...).transform(new MyTransformer(...), "MyStore") {code} INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works. > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >
[jira] [Updated] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mateusz Owczarek updated KAFKA-7882: Description: Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation: {code:java} override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { val partition = context.partition() if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) else logger.warn(s"-1 partition") null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback } {code} What I do get is the following error: {code:java} Store MyStore is currently closed{code} This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation. If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however, I haven't found anything that could help me here. I build my state store like this: {code:java} val stateStore = Stores.windowStoreBuilder( Stores.persistentWindowStore( stateStoreName, timeWindows.maintainMs() + timeWindows.sizeMs + TimeUnit.DAYS.toMillis(1), timeWindows.segments, timeWindows.sizeMs, false ), serde[K], serde[(V, Int)] ) {code} and include it in a DSL API like this: {code:java} builder.addStateStore(stateStore) (...).transform(new MyTransformer(...), "MyStore") {code} INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works. was: Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation: {code:java} override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { val partition = context.partition() if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) else logger.warn(s"-1 partition") null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback } {code} What I do get is the following error: {code:java} Store MyStore is currently closed{code} This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation. If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however, I haven't found anything that could help me here. I build my state store like this: {code:java} Stores.windowStoreBuilder( Stores.persistentWindowStore( stateStoreName, timeWindows.maintainMs() + timeWindows.sizeMs + TimeUnit.DAYS.toMillis(1), timeWindows.segments, timeWindows.sizeMs, false ), serde[K], serde[(V, Int)] ) {code} and include it in a DSL API like this: {code:java} builder.addStateStore() (...).transform(new MyTransformer(...), "MyStore") {code} INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works. > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate
[jira] [Updated] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mateusz Owczarek updated KAFKA-7882: Description: Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation: {code:java} override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { val partition = context.partition() if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) else logger.warn(s"-1 partition") null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback } {code} What I do get is the following error: {code:java} Store MyStore is currently closed{code} This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation. If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however, I haven't found anything that could help me here. I build my state store like this: {code:java} Stores.windowStoreBuilder( Stores.persistentWindowStore( stateStoreName, timeWindows.maintainMs() + timeWindows.sizeMs + TimeUnit.DAYS.toMillis(1), timeWindows.segments, timeWindows.sizeMs, false ), serde[K], serde[(V, Int)] ) {code} and include it in a DSL API like this: {code:java} builder.addStateStore() (...).transform(new MyTransformer(...), "MyStore") {code} INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works. was: Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation: {code:java} override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { val partition = context.partition() if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) else logger.warn(s"-1 partition") null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback } {code} What I go get is the following error: {code:java} Store MyStore is currently closed{code} This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation. If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however I haven't found anything that could help me here. INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works. > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null
[jira] [Created] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
Mateusz Owczarek created KAFKA-7882: --- Summary: StateStores are frequently closed during the 'transform' method Key: KAFKA-7882 URL: https://issues.apache.org/jira/browse/KAFKA-7882 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.0.0 Reporter: Mateusz Owczarek Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation: ``` override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { { val partition = context.partition() if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) else logger.warn(s"-1 partition") } null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback } ``` What I go get is the following error: ``` Store MyStore is currently closed ``` This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation. If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however I haven't found anything that could help me here. INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7882) StateStores are frequently closed during the 'transform' method
[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mateusz Owczarek updated KAFKA-7882: Description: Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation: {code:java} override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { val partition = context.partition() if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) else logger.warn(s"-1 partition") null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback } {code} What I go get is the following error: {code:java} Store MyStore is currently closed{code} This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation. If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however I haven't found anything that could help me here. INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works. was: Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation: ``` override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { { val partition = context.partition() if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) else logger.warn(s"-1 partition") } null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback } ``` What I go get is the following error: ``` Store MyStore is currently closed ``` This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation. If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however I haven't found anything that could help me here. INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works. > StateStores are frequently closed during the 'transform' method > --- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I go get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm
[jira] [Commented] (KAFKA-7753) ValueTransformerWithKey should not require producing one value every message
[ https://issues.apache.org/jira/browse/KAFKA-7753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725955#comment-16725955 ] Mateusz Owczarek commented on KAFKA-7753: - I see, thanks for quick answer, closing the issue ;) > ValueTransformerWithKey should not require producing one value every message > > > Key: KAFKA-7753 > URL: https://issues.apache.org/jira/browse/KAFKA-7753 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Mateusz Owczarek >Priority: Minor > > Hi, speaking about: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java] > I have a quite simple case - I want to implement a Transformer which I will > use later in my DSL-api-defined topology. Those are my requirements: > - no repartition topic should be created since I do not change the keys > - I can't forward messages on every message transformed. I have my internal > state store which I'm using to ensure max. 1 message per window is sent. > Basic transformer gives me possibility to not send messages down the stream > (null internal handler), but it forces repartitioning which I think is > unnecessary in my case. WDYT? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7753) ValueTransformerWithKey should not require producing one value every message
[ https://issues.apache.org/jira/browse/KAFKA-7753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mateusz Owczarek resolved KAFKA-7753. - Resolution: Feedback Received > ValueTransformerWithKey should not require producing one value every message > > > Key: KAFKA-7753 > URL: https://issues.apache.org/jira/browse/KAFKA-7753 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Mateusz Owczarek >Priority: Minor > > Hi, speaking about: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java] > I have a quite simple case - I want to implement a Transformer which I will > use later in my DSL-api-defined topology. Those are my requirements: > - no repartition topic should be created since I do not change the keys > - I can't forward messages on every message transformed. I have my internal > state store which I'm using to ensure max. 1 message per window is sent. > Basic transformer gives me possibility to not send messages down the stream > (null internal handler), but it forces repartitioning which I think is > unnecessary in my case. WDYT? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7753) ValueTransformerWithKey should not require producing one value every message
[ https://issues.apache.org/jira/browse/KAFKA-7753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725902#comment-16725902 ] Mateusz Owczarek commented on KAFKA-7753: - Actually it would be fine, but because I missread ValueTransformerWithKey docs it does not work for me. Apparently I cannot forward any messages while punctuating in this type of transformer. If I don't want to change the keys and forward them frequently, but not only while transforming them, I need to use Transformer and risk repartitioning. Why is that? > ValueTransformerWithKey should not require producing one value every message > > > Key: KAFKA-7753 > URL: https://issues.apache.org/jira/browse/KAFKA-7753 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Mateusz Owczarek >Priority: Minor > > Hi, speaking about: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java] > I have a quite simple case - I want to implement a Transformer which I will > use later in my DSL-api-defined topology. Those are my requirements: > - no repartition topic should be created since I do not change the keys > - I can't forward messages on every message transformed. I have my internal > state store which I'm using to ensure max. 1 message per window is sent. > Basic transformer gives me possibility to not send messages down the stream > (null internal handler), but it forces repartitioning which I think is > unnecessary in my case. WDYT? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7753) ValueTransformerWithKey should not require producing one value every message
Mateusz Owczarek created KAFKA-7753: --- Summary: ValueTransformerWithKey should not require producing one value every message Key: KAFKA-7753 URL: https://issues.apache.org/jira/browse/KAFKA-7753 Project: Kafka Issue Type: Wish Components: streams Reporter: Mateusz Owczarek Hi, speaking about: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java] I have a quite simple case - I want to implement a Transformer which I will use later in my DSL-api-defined topology. Those are my requirements: - no repartition topic should be created since I do not change the keys - I can't forward messages on every message transformed. I have my internal state store which I'm using to ensure max. 1 message per window is sent. Basic transformer gives me possibility to not send messages down the stream (null internal handler), but it forces repartitioning which I think is unnecessary in my case. WDYT? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes
[ https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16696653#comment-16696653 ] Mateusz Owczarek commented on KAFKA-7669: - Thank you for the answer, I figured out that the topology definition order needs to be consistent throughout all of the application instances and already fixed my code properly. I was just pointing out, that from my point of view, this order-consistency should be clearly indicated in docs. I have not heard about internal Kafka Streams topics explicit naming possibility in 2.1. Thanks for pointing it out! That solves many of my problems! :) > Stream topology definition is not robust to the ordering changes > > > Key: KAFKA-7669 > URL: https://issues.apache.org/jira/browse/KAFKA-7669 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > It seems that if the user does not guarantee the order of the stream topology > definition, he may end up with multiple stream branches having the same > internal changelog (and repartition, if created) topic. > Let's assume: > {code:java} > val initialStream = new StreamsBuilder().stream(sth); > val someStrings = (1 to 10).map(_.toString) > val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = > someStrings.map(s => s -> initialStream.filter(...)).toMap{code} > When the user defines now common aggregation logic for the > notGuaranteedOrderOfStreams, and runs multiple instances of the application > the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will > contain results of the different streams from notGuaranteedOrderOfStreams map. > All of this without a single warning that the topology (or just the order of > the topology definition) differs in different instances of the Kafka Streams > application. > Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog > " match so well for the different application instances (and different > topologies). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7669) Stream topology definition is not robust to the ordering changes
[ https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mateusz Owczarek updated KAFKA-7669: Issue Type: Wish (was: Bug) > Stream topology definition is not robust to the ordering changes > > > Key: KAFKA-7669 > URL: https://issues.apache.org/jira/browse/KAFKA-7669 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > It seems that if the user does not guarantee the order of the stream topology > definition, he may end up with multiple stream branches having the same > internal changelog (and repartition, if created) topic. > Let's assume: > {code:java} > val initialStream = new StreamsBuilder().stream(sth); > val someStrings = (1 to 10).map(_.toString) > val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = > someStrings.map(s => s -> initialStream.filter(...)).toMap{code} > When the user defines now common aggregation logic for the > notGuaranteedOrderOfStreams, and runs multiple instances of the application > the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will > contain results of the different streams from notGuaranteedOrderOfStreams map. > All of this without a single warning that the topology (or just the order of > the topology definition) differs in different instances of the Kafka Streams > application. > Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog > " match so well for the different application instances (and different > topologies). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7669) Stream topology definition is not robust to the ordering changes
[ https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mateusz Owczarek updated KAFKA-7669: Summary: Stream topology definition is not robust to the ordering changes (was: Stream topology definition is not prune to the ordering changes) > Stream topology definition is not robust to the ordering changes > > > Key: KAFKA-7669 > URL: https://issues.apache.org/jira/browse/KAFKA-7669 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > It seems that if the user does not guarantee the order of the stream topology > definition, he may end up with multiple stream branches having the same > internal changelog (and repartition, if created) topic. > Let's assume: > {code:java} > val initialStream = new StreamsBuilder().stream(sth); > val someStrings = (1 to 10).map(_.toString) > val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = > someStrings.map(s => s -> initialStream.filter(...)).toMap{code} > When the user defines now common aggregation logic for the > notGuaranteedOrderOfStreams, and runs multiple instances of the application > the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will > contain results of the different streams from notGuaranteedOrderOfStreams map. > All of this without a single warning that the topology (or just the order of > the topology definition) differs in different instances of the Kafka Streams > application. > Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog > " match so well for the different application instances (and different > topologies). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7669) Stream topology definition is not prune to the ordering changes
Mateusz Owczarek created KAFKA-7669: --- Summary: Stream topology definition is not prune to the ordering changes Key: KAFKA-7669 URL: https://issues.apache.org/jira/browse/KAFKA-7669 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.0.0 Reporter: Mateusz Owczarek It seems that if the user does not guarantee the order of the stream topology definition, he may end up with multiple stream branches having the same internal changelog (and repartition, if created) topic. Let's assume: {code:java} val initialStream = new StreamsBuilder().stream(sth); val someStrings = (1 to 10).map(_.toString) val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = someStrings.map(s => s -> initialStream.filter(...)).toMap{code} When the user defines now common aggregation logic for the notGuaranteedOrderOfStreams, and runs multiple instances of the application the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will contain results of the different streams from notGuaranteedOrderOfStreams map. All of this without a single warning that the topology (or just the order of the topology definition) differs in different instances of the Kafka Streams application. Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog " match so well for the different application instances (and different topologies). -- This message was sent by Atlassian JIRA (v7.6.3#76005)