Re: Kafka Streams : Problem with Global State Restoration

2017-10-20 Thread Tony John
Hi Damian,

Thanks a lot for the response. Just saw your reply when I visited the
mailer-list
archive
.
Unfortunately I haven't received the same on my inbox and I didn't even see
the update in the archive when I checked earlier today. Anyways once again
thanks a lot for the response. I will raise a JIRA as you suggested and I
hope this isn't the case with local state stores.

Thanks,
Tony

On Wed, Oct 18, 2017 at 9:21 PM, Tony John  wrote:

> Hello All,
>
> I have been trying to create an application on top of Kafka Streams. I am
> newbie to Kafka & Kakfa streams. So please excuse if I my understanding are
> wrong.
>
> I got the application running fine on a single instance ec2 instance in
> AWS. Now I am looking at scaling and ran in to some issues. The application
> has a global state store and couple of other local one's backed by RocksDB.
> It uses the processor API's and the stream is built using the
> TopologyBuilder. The global state store is fed by a topic which send a key
> value pair (both are protobuf objects) and connected to a processor which
> then transforms the value by applying some logic, finally stores the key
> and the modified data to the store. Similarly the local stores are
> connected via processors which are fed by different topics. Now the issue
> is that when I launch a new instance of the app, task re-allocation and
> state restoration happens, and the stores get replicated on to the new
> instance. But the global store which is replicated on to the new instance
> has some other data (I guess thats the raw data) as opposed to the
> processed data.
>
> *Application Topology*
>
> *Global Store*
>
> Source Topic (Partition Count = 1, Replication Factor = 2, Compacted =
> false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging
> disabled) -> Global Store
>
> *Local Store*
>
> Source Topic (Partition Count = 16, Replication Factor = 2, Compacted =
> true)
>
>  -> LocalStoreProcessor (
> Persistent, Caching enabled, Logging enabled
>
> ) -> Local state stores on different partitions
>
> *Sample Code (Written in Kotlin)*
>
> val streams: KafkaStreams
> init {
> val builder = KStreamBuilder().apply {
>
> val globalStore = Stores.create(Config.DICTIONARY)
> .withKeys(Serdes.String())
> .withValues(Serdes.String())
> .persistent()
> .enableCaching()
> .disableLogging()
> .build() as 
> StateStoreSupplier>
>
> addGlobalStore(globalStore, "dictionary-words-source", 
> Serdes.String().deserializer(), Serdes.String().deserializer(),
> Config.DICTIONARY_WORDS_TOPIC, "dictionary-words-processor", 
> DictionaryWordsProcessor.Supplier)
>
>
> addSource("query-source", Serdes.String().deserializer(), 
> Serdes.String().deserializer(), Config.QUERIES_TOPIC)
> addProcessor("query-processor", QueryProcessor.Supplier, 
> "query-source")
>
> }
>
> val config = StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to 
> Config.APPLICATION_ID,
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS,
> StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR
> ))
> streams = KafkaStreams(builder, config)
>
> Runtime.getRuntime().addShutdownHook(Thread {
> println("Shutting down Kafka Streams...")
> streams.close()
> println("Shut down successfully")
> })
> }
>
> fun run() {
> Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1, 
> Config.REPLICATION_FACTOR, true)
> Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT, 
> Config.REPLICATION_FACTOR, false)
> streams.start()
> }
>
>
> *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application
> instances.
>
>
> So just wanted to know the process of state store restoration while
> scaling up and down. How does the streams manage to restore the data? I was
> expecting when the new instance gets launched, the data flows through the
> same processor so that it gets modified using the same logic which is
> applied when it was stored in instance 1. Could you please help me
> understand this little better. Please let me know if there is anyway to get
> the restoration process to route the data via the same processor.
>
>
> Thanks,
> Tony
>


Re: Kafka Streams : Problem with Global State Restoration

2017-10-18 Thread Damian Guy
Hi Tony,

The issue is that the GlobalStore doesn't use the Processor when restoring
the state. It just reads the raw records from the underlying topic. You
could work around this by doing the processing and writing to another
topic. Then use the other topic as the source for your global-store.
It is probably worth raising a JIRA for this, too.

Thanks,
Damian

On Wed, 18 Oct 2017 at 17:01 Tony John  wrote:

> Hello All,
>
> I have been trying to create an application on top of Kafka Streams. I am
> newbie to Kafka & Kakfa streams. So please excuse if I my understanding are
> wrong.
>
> I got the application running fine on a single instance ec2 instance in
> AWS. Now I am looking at scaling and ran in to some issues. The application
> has a global state store and couple of other local one's backed by RocksDB.
> It uses the processor API's and the stream is built using the
> TopologyBuilder. The global state store is fed by a topic which send a key
> value pair (both are protobuf objects) and connected to a processor which
> then transforms the value by applying some logic, finally stores the key
> and the modified data to the store. Similarly the local stores are
> connected via processors which are fed by different topics. Now the issue
> is that when I launch a new instance of the app, task re-allocation and
> state restoration happens, and the stores get replicated on to the new
> instance. But the global store which is replicated on to the new instance
> has some other data (I guess thats the raw data) as opposed to the
> processed data.
>
> *Application Topology*
>
> *Global Store*
>
> Source Topic (Partition Count = 1, Replication Factor = 2, Compacted =
> false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging
> disabled) -> Global Store
>
> *Local Store*
>
> Source Topic (Partition Count = 16, Replication Factor = 2, Compacted =
> true)
>
>  -> LocalStoreProcessor (
> Persistent, Caching enabled, Logging enabled
>
> ) -> Local state stores on different partitions
>
> *Sample Code (Written in Kotlin)*
>
> val streams: KafkaStreams
> init {
> val builder = KStreamBuilder().apply {
>
> val globalStore = Stores.create(Config.DICTIONARY)
> .withKeys(Serdes.String())
> .withValues(Serdes.String())
> .persistent()
> .enableCaching()
> .disableLogging()
> .build() as
> StateStoreSupplier>
>
> addGlobalStore(globalStore, "dictionary-words-source",
> Serdes.String().deserializer(), Serdes.String().deserializer(),
> Config.DICTIONARY_WORDS_TOPIC,
> "dictionary-words-processor", DictionaryWordsProcessor.Supplier)
>
>
> addSource("query-source", Serdes.String().deserializer(),
> Serdes.String().deserializer(), Config.QUERIES_TOPIC)
> addProcessor("query-processor", QueryProcessor.Supplier,
> "query-source")
>
> }
>
> val config =
> StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to
> Config.APPLICATION_ID,
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS,
> StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR
> ))
> streams = KafkaStreams(builder, config)
>
> Runtime.getRuntime().addShutdownHook(Thread {
> println("Shutting down Kafka Streams...")
> streams.close()
> println("Shut down successfully")
> })
> }
>
> fun run() {
> Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1,
> Config.REPLICATION_FACTOR, true)
> Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT,
> Config.REPLICATION_FACTOR, false)
> streams.start()
> }
>
>
> *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application
> instances.
>
>
> So just wanted to know the process of state store restoration while scaling
> up and down. How does the streams manage to restore the data? I was
> expecting when the new instance gets launched, the data flows through the
> same processor so that it gets modified using the same logic which is
> applied when it was stored in instance 1. Could you please help me
> understand this little better. Please let me know if there is anyway to get
> the restoration process to route the data via the same processor.
>
>
> Thanks,
> Tony
>


Kafka Streams : Problem with Global State Restoration

2017-10-18 Thread Tony John
Hello All,

I have been trying to create an application on top of Kafka Streams. I am
newbie to Kafka & Kakfa streams. So please excuse if I my understanding are
wrong.

I got the application running fine on a single instance ec2 instance in
AWS. Now I am looking at scaling and ran in to some issues. The application
has a global state store and couple of other local one's backed by RocksDB.
It uses the processor API's and the stream is built using the
TopologyBuilder. The global state store is fed by a topic which send a key
value pair (both are protobuf objects) and connected to a processor which
then transforms the value by applying some logic, finally stores the key
and the modified data to the store. Similarly the local stores are
connected via processors which are fed by different topics. Now the issue
is that when I launch a new instance of the app, task re-allocation and
state restoration happens, and the stores get replicated on to the new
instance. But the global store which is replicated on to the new instance
has some other data (I guess thats the raw data) as opposed to the
processed data.

*Application Topology*

*Global Store*

Source Topic (Partition Count = 1, Replication Factor = 2, Compacted =
false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging
disabled) -> Global Store

*Local Store*

Source Topic (Partition Count = 16, Replication Factor = 2, Compacted =
true)

 -> LocalStoreProcessor (
Persistent, Caching enabled, Logging enabled

) -> Local state stores on different partitions

*Sample Code (Written in Kotlin)*

val streams: KafkaStreams
init {
val builder = KStreamBuilder().apply {

val globalStore = Stores.create(Config.DICTIONARY)
.withKeys(Serdes.String())
.withValues(Serdes.String())
.persistent()
.enableCaching()
.disableLogging()
.build() as
StateStoreSupplier>

addGlobalStore(globalStore, "dictionary-words-source",
Serdes.String().deserializer(), Serdes.String().deserializer(),
Config.DICTIONARY_WORDS_TOPIC,
"dictionary-words-processor", DictionaryWordsProcessor.Supplier)


addSource("query-source", Serdes.String().deserializer(),
Serdes.String().deserializer(), Config.QUERIES_TOPIC)
addProcessor("query-processor", QueryProcessor.Supplier, "query-source")

}

val config =
StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to
Config.APPLICATION_ID,
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS,
StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR
))
streams = KafkaStreams(builder, config)

Runtime.getRuntime().addShutdownHook(Thread {
println("Shutting down Kafka Streams...")
streams.close()
println("Shut down successfully")
})
}

fun run() {
Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1,
Config.REPLICATION_FACTOR, true)
Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT,
Config.REPLICATION_FACTOR, false)
streams.start()
}


*Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application
instances.


So just wanted to know the process of state store restoration while scaling
up and down. How does the streams manage to restore the data? I was
expecting when the new instance gets launched, the data flows through the
same processor so that it gets modified using the same logic which is
applied when it was stored in instance 1. Could you please help me
understand this little better. Please let me know if there is anyway to get
the restoration process to route the data via the same processor.


Thanks,
Tony