[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz Owczarek updated KAFKA-7882:
------------------------------------
    Description: 
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:
{code:java}
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition")

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
{code}
 

What I go get is the following error:
{code:java}
Store MyStore is currently closed{code}
This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however I haven't found 
anything that could help me here. 

INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.

  was:
Hello, I have a problem with the state store being closed frequently while 
transforming upcoming records. To ensure only one record of the same key and 
the window comes to an aggregate I have created a custom Transformer (I know 
something similar is going to be introduced with suppress method on KTable in 
the future, but my implementation is quite simple and imo should work 
correctly) with the following implementation:

```
override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {

{ 
val partition = context.partition() 
if (partition != -1) store.put(key.key(), (value, partition), 
key.window().start()) 
else logger.warn(s"-1 partition") 
}

null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
punctuator callback
}
```

What I go get is the following error:
```
Store MyStore is currently closed
```

This problem appears only when the number of streaming threads (or input topic 
partitions) is greater than 1 even if I'm just saving to the store and turn off 
the punctuation.

If punctuation is present, however, I sometimes get -1 as a partition value in 
the transform method. I'm familiar with the basic docs, however I haven't found 
anything that could help me here.

INB4: I don't close any state stores manually, I gave them retention time as 
long as possible for the debugging stage, I tried to hotfix that with the retry 
in the transform method and the state stores reopen at the end and the `put` 
method works, but this approach is questionable and I am concerned if it 
actually works.


> StateStores are frequently closed during the 'transform' method
> ---------------------------------------------------------------
>
>                 Key: KAFKA-7882
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7882
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Mateusz Owczarek
>            Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I go get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however I haven't 
> found anything that could help me here. 
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to