[ 
https://issues.apache.org/jira/browse/KAFKA-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Stephenson updated KAFKA-8143:
-----------------------------------
    Description: 
I've created a small example application which has a trivial `Processor` which 
takes messages and stores the length of the String value rather than the value 
itself.

That is, the following setup:
{code:java}
Topic[String, String]
Processor[String, String]
KeyValueStore[String, Long] // Note the Store persists Long values
{code}
 

The example application also has a Thread which periodically displays all 
values in the KeyValueStore.

While the application is run, I can publish values to the topic with:
{code:java}
root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
"parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
--topic test.topic
>1:hello
>2:abc{code}
And the background Thread will report the values persisted to the key value 
store.

If the application is restarted, when attempting to read from the KeyValueStore 
it will fail.  It attempts to recover the state from the persistent RocksDB 
store which fails with:
{code:java}
org.apache.kafka.common.errors.SerializationException: Size of data received by 
LongDeserializer is not 8{code}
(Note there is no stack trace as SerializationException has disabled it.)

Debugging appears to reveal that the original data from the Topic is being 
restored rather than what was modified by the processor.

I've created a minimal example to show the issue at 
[https://github.com/lukestephenson/kafka-streams-example]

  was:
I've created a small example application which has a trivial `Processor` which 
takes messages and stores the length of the String value rather than the value 
itself.

That is, the following setup:

 
{code:java}
Topic[String, String]
Processor[String, String]
KeyValueStore[String, Long] // Note the Store persists Long values
{code}
 

The example application also has a Thread which periodically displays all 
values in the KeyValueStore.

While the application is run, I can publish values to the topic with:
{code:java}
root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
"parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
--topic test.topic
>1:hello
>2:abc{code}
And the background Thread will report the values persisted to the key value 
store.

If the application is restarted, when attempting to read from the KeyValueStore 
it will fail.  It attempts to recover the state from the persistent RocksDB 
store which fails with:
{code:java}
org.apache.kafka.common.errors.SerializationException: Size of data received by 
LongDeserializer is not 8{code}
(Note there is no stack trace as SerializationException has disabled it.)

Debugging appears to reveal that the original data from the Topic is being 
restored rather than what was modified by the processor.

I've created a minimal example to show the issue at 
[https://github.com/lukestephenson/kafka-streams-example]


> Kafka-Streams GlobalStore cannot be read after application restart
> ------------------------------------------------------------------
>
>                 Key: KAFKA-8143
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8143
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.1
>            Reporter: Luke Stephenson
>            Priority: Major
>
> I've created a small example application which has a trivial `Processor` 
> which takes messages and stores the length of the String value rather than 
> the value itself.
> That is, the following setup:
> {code:java}
> Topic[String, String]
> Processor[String, String]
> KeyValueStore[String, Long] // Note the Store persists Long values
> {code}
>  
> The example application also has a Thread which periodically displays all 
> values in the KeyValueStore.
> While the application is run, I can publish values to the topic with:
> {code:java}
> root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
> "parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
> --topic test.topic
> >1:hello
> >2:abc{code}
> And the background Thread will report the values persisted to the key value 
> store.
> If the application is restarted, when attempting to read from the 
> KeyValueStore it will fail.  It attempts to recover the state from the 
> persistent RocksDB store which fails with:
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by LongDeserializer is not 8{code}
> (Note there is no stack trace as SerializationException has disabled it.)
> Debugging appears to reveal that the original data from the Topic is being 
> restored rather than what was modified by the processor.
> I've created a minimal example to show the issue at 
> [https://github.com/lukestephenson/kafka-streams-example]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to