Hi Tony,

The issue is that the GlobalStore doesn't use the Processor when restoring
the state. It just reads the raw records from the underlying topic. You
could work around this by doing the processing and writing to another
topic. Then use the other topic as the source for your global-store.
It is probably worth raising a JIRA for this, too.

Thanks,
Damian

On Wed, 18 Oct 2017 at 17:01 Tony John <tonyjohnant...@gmail.com> wrote:

> Hello All,
>
> I have been trying to create an application on top of Kafka Streams. I am
> newbie to Kafka & Kakfa streams. So please excuse if I my understanding are
> wrong.
>
> I got the application running fine on a single instance ec2 instance in
> AWS. Now I am looking at scaling and ran in to some issues. The application
> has a global state store and couple of other local one's backed by RocksDB.
> It uses the processor API's and the stream is built using the
> TopologyBuilder. The global state store is fed by a topic which send a key
> value pair (both are protobuf objects) and connected to a processor which
> then transforms the value by applying some logic, finally stores the key
> and the modified data to the store. Similarly the local stores are
> connected via processors which are fed by different topics. Now the issue
> is that when I launch a new instance of the app, task re-allocation and
> state restoration happens, and the stores get replicated on to the new
> instance. But the global store which is replicated on to the new instance
> has some other data (I guess thats the raw data) as opposed to the
> processed data.
>
> *Application Topology*
>
> *Global Store*
>
> Source Topic (Partition Count = 1, Replication Factor = 2, Compacted =
> false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging
> disabled) -> Global Store
>
> *Local Store*
>
> Source Topic (Partition Count = 16, Replication Factor = 2, Compacted =
> true)
>
>  -> LocalStoreProcessor (
> Persistent, Caching enabled, Logging enabled
>
> ) -> Local state stores on different partitions
>
> *Sample Code (Written in Kotlin)*
>
> val streams: KafkaStreams
> init {
>     val builder = KStreamBuilder().apply {
>
>         val globalStore = Stores.create(Config.DICTIONARY)
>                                 .withKeys(Serdes.String())
>                                 .withValues(Serdes.String())
>                                 .persistent()
>                                 .enableCaching()
>                                 .disableLogging()
>                                 .build() as
> StateStoreSupplier<KeyValueStore<*, *>>
>
>         addGlobalStore(globalStore, "dictionary-words-source",
> Serdes.String().deserializer(), Serdes.String().deserializer(),
>                 Config.DICTIONARY_WORDS_TOPIC,
> "dictionary-words-processor", DictionaryWordsProcessor.Supplier)
>
>
>         addSource("query-source", Serdes.String().deserializer(),
> Serdes.String().deserializer(), Config.QUERIES_TOPIC)
>         addProcessor("query-processor", QueryProcessor.Supplier,
> "query-source")
>
>     }
>
>     val config =
> StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to
> Config.APPLICATION_ID,
>             StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS,
>             StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR
>             ))
>     streams = KafkaStreams(builder, config)
>
>     Runtime.getRuntime().addShutdownHook(Thread {
>         println("Shutting down Kafka Streams...")
>         streams.close()
>         println("Shut down successfully")
>     })
> }
>
> fun run() {
>     Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1,
> Config.REPLICATION_FACTOR, true)
>     Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT,
> Config.REPLICATION_FACTOR, false)
>     streams.start()
> }
>
>
> *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application
> instances.
>
>
> So just wanted to know the process of state store restoration while scaling
> up and down. How does the streams manage to restore the data? I was
> expecting when the new instance gets launched, the data flows through the
> same processor so that it gets modified using the same logic which is
> applied when it was stored in instance 1. Could you please help me
> understand this little better. Please let me know if there is anyway to get
> the restoration process to route the data via the same processor.
>
>
> Thanks,
> Tony
>

Reply via email to