[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-31 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853118#comment-16853118
 ] 

Mateusz Owczarek commented on KAFKA-7994:
-

Bruno's example above indicates it actually does affect `suppress` operator in 
terms of producing only 'final window result'. For me the keyword 'final' 
suggests that one and only one result will be produced, whereas one simple 
rebalancing may cause multiple results of the same window.

Only recently I've been using `suppress` operator in my application and found 
out this behavior, so I've created a pull request for my Kafka Streams example 
repository describing the steps to simply reproduce multiple `suppress` results:
https://github.com/mowczare/kafka-streams-scala/pull/1

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-05-17 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842015#comment-16842015
 ] 

Mateusz Owczarek commented on KAFKA-5998:
-

[~guozhang] Any updates on this? I have the same issues with Kafka 2.1.1.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-15 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769611#comment-16769611
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

Late record as I understand (correct me if I'm wrong) is between the window 
duration end time and window grace period end time. After that I reject all the 
late events.

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-15 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769321#comment-16769321
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

I am using this approach only after window aggregation step so that all the 
input events are windowed. Final output event is produced after window duration 
+ window grace period (the check if the time has expired is repeated multiple 
times using wall-clock punctuation). After that, I clear my state store 
accordingly. In this way information is not kept forever and max 1 event per 
window is emitted. Am I right?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-14 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768597#comment-16768597
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

I understand that, I'm familiar with the wall-clock vs event time basic 
concepts. I presume there may be latencies and wall-clock punctuation may not 
produce any events in a window. I'm accepting this risk. What I was asking is 
why it is not guaranteed that max one event per window is to be published while 
using wall-clock time on my suppress transformer.

This is crucial for me and 2.1.0 suppress implementation does not provide it, 
presumably, with emitEarlyWhenFull approach.

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-14 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768064#comment-16768064
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

Yes, we can, but there is still one unanswered question (maybe not related to 
the subject of the issue, but forgive me, I'm very curious):
{quote}_Offhand, if you were previously using wall-clock time, you don't need 
semantically airtight suppression behavior, so, emitting early when the buffer 
fills up should be fine._
{quote}
_As far as I understand, emitEarlyWhenFull does not guarantee one event per 
window so it's not sufficient for me. Could you elaborate on why any wall-clock 
time implementation does not ensure it either?_

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-13 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767006#comment-16767006
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

[~mjsax]
As I reported, I was using Kafka Streams 2.0.0 with Scala DSL API, where 
transform method accepts Transformer instance (not TransformerSupplier) as a 
parameter:
{code}
  def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)],
stateStoreNames: String*): KStream[K1, V1] = {
val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new 
TransformerSupplier[K, V, KeyValue[K1, V1]] {
  override def get(): Transformer[K, V, KeyValue[K1, V1]] = {
new Transformer[K, V, KeyValue[K1, V1]] {
  override def transform(key: K, value: V): KeyValue[K1, V1] = {
transformer.transform(key, value) match {
  case (k1, v1) => KeyValue.pair(k1, v1)
  case _ => null
}
  }

  override def init(context: ProcessorContext): Unit = 
transformer.init(context)

  override def close(): Unit = transformer.close()
}
  }
}
inner.transform(transformerSupplierJ, stateStoreNames: _*)
  }
{code}
I believe the implementation changed now in 2.1.0 and does actually accept 
TransformerSupplier.

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-12 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765967#comment-16765967
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

[~vvcephei]

This is pretty much, how do I call suppress method:

{code:java}
  .suppress(new FinalResultsSuppressionBuilder(
stateStoreName,
new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, 
BufferFullStrategy.SPILL_TO_DISK)
  ))
{code}

As far as I can see using static untilWindowCloses method also requires 
StrictBufferConfig and StrictBufferConfigImpl is a public class. Maybe it 
should not be? I presume I should have used withMaxBytes or unbounded static 
methods from the BufferConfig interface. I understand that the way I used the 
suppress method, fulfilled buffer will eventually throw an unexpected exception 
and stop my application when the buffer is full?


bq. Offhand, if you were previously using wall-clock time, you don't need 
semantically airtight suppression behavior, so, emitting early when the buffer 
fills up should be fine.

As far as I understand, emitEarlyWhenFull does not guarantee one event per 
window so it's not sufficient for me. Could you elaborate on why any wall-clock 
time implementation does not ensure it either?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-11 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764999#comment-16764999
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

Sorry for bumping post, but any advice on creating test case for topology 
containing `.suppress` method would be very helpful :)

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-11 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761758#comment-16761758
 ] 

Mateusz Owczarek edited comment on KAFKA-7882 at 2/11/19 2:16 PM:
--

Also, I noticed my regression tests checking if events are actually produced 
after the time-window has finished are not passing. They are based on the 
`TopologyTestDriver` with the `advanceWallClockTime` since my previous suppress 
impl. was based on wall clock time punctuation. How can I test my topology now? 
Basically how can I test any topology containing `.suppress` method from Kafka 
2.1.0?


was (Author: nijo):
Also, I noticed my regression tests checking if events are actually produced 
after the time-window has finished are not passing. They are based on the 
`TopologyTestDriver` with the `advanceWallClockTime` since my previous suppress 
impl. was based on wall clock time punctuation. How can I test my topology now?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-06 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761758#comment-16761758
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

Also, I noticed my regression tests checking if events are actually produced 
after the time-window has finished are not passing. They are based on the 
`TopologyTestDriver` with the `advanceWallClockTime` since my previous suppress 
impl. was based on wall clock time punctuation. How can I test my topology now?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-06 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761706#comment-16761706
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

Hey [~vvcephei], thanks for the answer! I've upgraded Kafka to 2.1.0 and 
migrated my code to use your suppress method and it looks like it will fit in 
my code nicely. However, I'm a bit concerned with the lack of documentation on 
the `BufferFullStrategy` class. It seems, at a first glance, I can choose 
`SPILL_TO_DISK` option being (afaik) default state store impl. based on RocksDB 
on any stateful operations (aggregates, counts and so on). Does it work like 
this in this case? Should I be worried that my application will shutdown if my 
memory would be overfilled?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-01-30 Thread Mateusz Owczarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz Owczarek updated KAFKA-7882:

Description: 
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:
{code:java}
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition")

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
{code}
 

What I do get is the following error:
{code:java}
Store MyStore is currently closed{code}
This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however, I haven't 
found anything that could help me here.

I build my state store like this:
{code:java}
val stateStore = Stores.windowStoreBuilder(
  Stores.persistentWindowStore(
stateStoreName,
timeWindows.maintainMs() + timeWindows.sizeMs + 
TimeUnit.DAYS.toMillis(1),
timeWindows.segments,
timeWindows.sizeMs,
false
  ),
  serde[K],
  serde[(V, Int)]
)
{code}
and include it in a DSL API like this:
{code:java}
builder.addStateStore(stateStore)
(...).transform(new MyTransformer(...), "MyStore")
{code}
INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.

Edit:
May this be because of the fact that the 
{code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
value? If I understand correctly, spilling to disk is done therefore more 
frequently, may it, therefore, cause closing the store?

  was:
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:
{code:java}
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition")

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
{code}
 

What I do get is the following error:
{code:java}
Store MyStore is currently closed{code}
This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however, I haven't 
found anything that could help me here.

I build my state store like this:
{code:java}
val stateStore = Stores.windowStoreBuilder(
  Stores.persistentWindowStore(
stateStoreName,
timeWindows.maintainMs() + timeWindows.sizeMs + 
TimeUnit.DAYS.toMillis(1),
timeWindows.segments,
timeWindows.sizeMs,
false
  ),
  serde[K],
  serde[(V, Int)]
)
{code}
and include it in a DSL API like this:
{code:java}
builder.addStateStore(stateStore)
(...).transform(new MyTransformer(...), "MyStore")
{code}
INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.


> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>

[jira] [Updated] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-01-29 Thread Mateusz Owczarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz Owczarek updated KAFKA-7882:

Description: 
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:
{code:java}
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition")

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
{code}
 

What I do get is the following error:
{code:java}
Store MyStore is currently closed{code}
This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however, I haven't 
found anything that could help me here.

I build my state store like this:
{code:java}
val stateStore = Stores.windowStoreBuilder(
  Stores.persistentWindowStore(
stateStoreName,
timeWindows.maintainMs() + timeWindows.sizeMs + 
TimeUnit.DAYS.toMillis(1),
timeWindows.segments,
timeWindows.sizeMs,
false
  ),
  serde[K],
  serde[(V, Int)]
)
{code}
and include it in a DSL API like this:
{code:java}
builder.addStateStore(stateStore)
(...).transform(new MyTransformer(...), "MyStore")
{code}
INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.

  was:
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:
{code:java}
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition")

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
{code}
 

What I do get is the following error:
{code:java}
Store MyStore is currently closed{code}
This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however, I haven't 
found anything that could help me here.

I build my state store like this:
{code:java}
Stores.windowStoreBuilder(
  Stores.persistentWindowStore(
stateStoreName,
timeWindows.maintainMs() + timeWindows.sizeMs + 
TimeUnit.DAYS.toMillis(1),
timeWindows.segments,
timeWindows.sizeMs,
false
  ),
  serde[K],
  serde[(V, Int)]
)
{code}
and include it in a DSL API like this:
{code:java}
builder.addStateStore()
(...).transform(new MyTransformer(...), "MyStore")
{code}
INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.


> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate 

[jira] [Updated] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-01-29 Thread Mateusz Owczarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz Owczarek updated KAFKA-7882:

Description: 
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:
{code:java}
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition")

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
{code}
 

What I do get is the following error:
{code:java}
Store MyStore is currently closed{code}
This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however, I haven't 
found anything that could help me here.

I build my state store like this:
{code:java}
Stores.windowStoreBuilder(
  Stores.persistentWindowStore(
stateStoreName,
timeWindows.maintainMs() + timeWindows.sizeMs + 
TimeUnit.DAYS.toMillis(1),
timeWindows.segments,
timeWindows.sizeMs,
false
  ),
  serde[K],
  serde[(V, Int)]
)
{code}
and include it in a DSL API like this:
{code:java}
builder.addStateStore()
(...).transform(new MyTransformer(...), "MyStore")
{code}
INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.

  was:
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:
{code:java}
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition")

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
{code}
 

What I go get is the following error:
{code:java}
Store MyStore is currently closed{code}
This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however I haven't found 
anything that could help me here. 

INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.


> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null 

[jira] [Created] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-01-29 Thread Mateusz Owczarek (JIRA)
Mateusz Owczarek created KAFKA-7882:
---

 Summary: StateStores are frequently closed during the 'transform' 
method
 Key: KAFKA-7882
 URL: https://issues.apache.org/jira/browse/KAFKA-7882
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.0
Reporter: Mateusz Owczarek


Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:

```
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

{ 
val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition") 
}

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
```

What I go get is the following error:
```
Store MyStore is currently closed
```

This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however I haven't found 
anything that could help me here.

INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-01-29 Thread Mateusz Owczarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz Owczarek updated KAFKA-7882:

Description: 
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:
{code:java}
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition")

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
{code}
 

What I go get is the following error:
{code:java}
Store MyStore is currently closed{code}
This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however I haven't found 
anything that could help me here. 

INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.

  was:
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:

```
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

{ 
val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition") 
}

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
```

What I go get is the following error:
```
Store MyStore is currently closed
```

This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however I haven't found 
anything that could help me here.

INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.


> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I go get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm 

[jira] [Commented] (KAFKA-7753) ValueTransformerWithKey should not require producing one value every message

2018-12-20 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725955#comment-16725955
 ] 

Mateusz Owczarek commented on KAFKA-7753:
-

I see, thanks for quick answer, closing the issue ;)

> ValueTransformerWithKey should not require producing one value every message
> 
>
> Key: KAFKA-7753
> URL: https://issues.apache.org/jira/browse/KAFKA-7753
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Mateusz Owczarek
>Priority: Minor
>
> Hi, speaking about: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java]
> I have a quite simple case - I want to implement a Transformer which I will 
> use later in my DSL-api-defined topology. Those are my requirements:
> - no repartition topic should be created since I do not change the keys
> - I can't forward messages on every message transformed. I have my internal 
> state store which I'm using to ensure max. 1 message per window is sent.
> Basic transformer gives me possibility to not send messages down the stream 
> (null internal handler), but it forces repartitioning which I think is 
> unnecessary in my case. WDYT?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7753) ValueTransformerWithKey should not require producing one value every message

2018-12-20 Thread Mateusz Owczarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz Owczarek resolved KAFKA-7753.
-
Resolution: Feedback Received

> ValueTransformerWithKey should not require producing one value every message
> 
>
> Key: KAFKA-7753
> URL: https://issues.apache.org/jira/browse/KAFKA-7753
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Mateusz Owczarek
>Priority: Minor
>
> Hi, speaking about: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java]
> I have a quite simple case - I want to implement a Transformer which I will 
> use later in my DSL-api-defined topology. Those are my requirements:
> - no repartition topic should be created since I do not change the keys
> - I can't forward messages on every message transformed. I have my internal 
> state store which I'm using to ensure max. 1 message per window is sent.
> Basic transformer gives me possibility to not send messages down the stream 
> (null internal handler), but it forces repartitioning which I think is 
> unnecessary in my case. WDYT?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7753) ValueTransformerWithKey should not require producing one value every message

2018-12-20 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725902#comment-16725902
 ] 

Mateusz Owczarek commented on KAFKA-7753:
-

Actually it would be fine, but because I missread ValueTransformerWithKey docs 
it does not work for me. Apparently I cannot forward any messages while 
punctuating in this type of transformer. If I don't want to change the keys and 
forward them frequently, but not only while transforming them, I need to use 
Transformer and risk repartitioning. Why is that?

> ValueTransformerWithKey should not require producing one value every message
> 
>
> Key: KAFKA-7753
> URL: https://issues.apache.org/jira/browse/KAFKA-7753
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Mateusz Owczarek
>Priority: Minor
>
> Hi, speaking about: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java]
> I have a quite simple case - I want to implement a Transformer which I will 
> use later in my DSL-api-defined topology. Those are my requirements:
> - no repartition topic should be created since I do not change the keys
> - I can't forward messages on every message transformed. I have my internal 
> state store which I'm using to ensure max. 1 message per window is sent.
> Basic transformer gives me possibility to not send messages down the stream 
> (null internal handler), but it forces repartitioning which I think is 
> unnecessary in my case. WDYT?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7753) ValueTransformerWithKey should not require producing one value every message

2018-12-18 Thread Mateusz Owczarek (JIRA)
Mateusz Owczarek created KAFKA-7753:
---

 Summary: ValueTransformerWithKey should not require producing one 
value every message
 Key: KAFKA-7753
 URL: https://issues.apache.org/jira/browse/KAFKA-7753
 Project: Kafka
  Issue Type: Wish
  Components: streams
Reporter: Mateusz Owczarek


Hi, speaking about: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java]
I have a quite simple case - I want to implement a Transformer which I will use 
later in my DSL-api-defined topology. Those are my requirements:
- no repartition topic should be created since I do not change the keys
- I can't forward messages on every message transformed. I have my internal 
state store which I'm using to ensure max. 1 message per window is sent.

Basic transformer gives me possibility to not send messages down the stream 
(null internal handler), but it forces repartitioning which I think is 
unnecessary in my case. WDYT?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2018-11-23 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16696653#comment-16696653
 ] 

Mateusz Owczarek commented on KAFKA-7669:
-

Thank you for the answer, I figured out that the topology definition order 
needs to be consistent throughout all of the application instances and already 
fixed my code properly. I was just pointing out, that from my point of view, 
this order-consistency should be clearly indicated in docs.

I have not heard about internal Kafka Streams topics explicit naming 
possibility in 2.1. Thanks for pointing it out! That solves many of my 
problems! :)

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2018-11-22 Thread Mateusz Owczarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz Owczarek updated KAFKA-7669:

Issue Type: Wish  (was: Bug)

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2018-11-22 Thread Mateusz Owczarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz Owczarek updated KAFKA-7669:

Summary: Stream topology definition is not robust to the ordering changes  
(was: Stream topology definition is not prune to the ordering changes)

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7669) Stream topology definition is not prune to the ordering changes

2018-11-22 Thread Mateusz Owczarek (JIRA)
Mateusz Owczarek created KAFKA-7669:
---

 Summary: Stream topology definition is not prune to the ordering 
changes
 Key: KAFKA-7669
 URL: https://issues.apache.org/jira/browse/KAFKA-7669
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.0
Reporter: Mateusz Owczarek


It seems that if the user does not guarantee the order of the stream topology 
definition, he may end up with multiple stream branches having the same 
internal changelog (and repartition, if created) topic. 

Let's assume:
{code:java}
val initialStream = new StreamsBuilder().stream(sth);
val someStrings = (1 to 10).map(_.toString)
val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = someStrings.map(s 
=> s -> initialStream.filter(...)).toMap{code}

When the user defines now common aggregation logic for the 
notGuaranteedOrderOfStreams, and runs multiple instances of the application the 
KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will contain 
results of the different streams from notGuaranteedOrderOfStreams map.

All of this without a single warning that the topology (or just the order of 
the topology definition) differs in different instances of the Kafka Streams 
application.

Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog " 
match so well for the different application instances (and different 
topologies).

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)