Hi Ted, I am using 0.11.0.0 . I am not using external state store. I am using the persistent store that comes with kafka stream 0.11.0.0. My assumption is put just inserts my object into the state store. I guess the state store is manged by rocksdb internally by kafkastreams.
I am not sure why the put is taking time over period of time. On Sat, Sep 16, 2017 at 7:07 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Looking at the calculation of totalTimeTakenToStoreRecords, it covers > store.put() > call. > > Can you tell us more about what the put() does ? > Does it involve external key value store ? > > Are you using 0.11.0.0 ? > > Thanks > > On Sat, Sep 16, 2017 at 6:14 AM, dev loper <spark...@gmail.com> wrote: > > > Hi Kafka Streams Users, > > > > I am trying to improve the performance of Kafka Streams State Store > > Persistent Store. In our application we are using Kafka Streams Processor > > API and using Persistent State Store.. My application when starts up it > > performing well but over a period of time the performance deteriorated. I > > am computing certain results in computeAnalytics method and this method > is > > not taking time at all. This method is being called within both process > and > > punctuate and I am storing the updated object back to store. Over the > > period of time its taking huge time for completing the punctuate process > > and I could see majority of the time is spent in storing the records and > > Iterating the records. The record size is just 2500 per partition. I am > not > > where I am going wrong and how can I improve the performance. > > > > Below is one such sample log record. > > > > INFO | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time > > Metrics for punctuate for TimeStamp :: 1505564655878 processed Records > :: > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord :: > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: > 40394 > > > > Below I have given my pseudo code for my processor which exactly > resembles > > the code which I am using in my application. > > > > MyProcessor(){ > > > > process(Key objectkey, Update eventupdate){ > > long timestamp=context.timestamp(); > > AnalyticeObj storeobj=store.get(objectkey); > > > > if( storeobj ===null) > > { > > storeobj=new AnalyticeObj(objectkey,eventupdate,timestamp) > > } > > else > > { > > storeobj.update(eventupdate,timestamp) > > } > > storeobj=storeobj.computeAnalytics(); > > > > store.put(objectkey,storeobj); > > context.commit(); > > } > > // Every 5 seconds > > punctuate(long timestamp) > > { > > long startTime = System.currentTimeMillis(); > > long totalTimeTakenToProcessRecords=0; > > long totalTimeTakenToStoreRecords=0; > > long counter=0; > > KeyValueIterator iter=this.visitStore.all(); > > while (iter.hasNext()) { > > KeyValue<Key, AnalyticeObj> entry = iter.next(); > > > > if(AnalyticeObj.hasExpired(timestamp) > > store.remove(entry.key) > > else > > { > > long processStartTime=System.currentTimeMillis(); > > AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp); > > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords > > +(System.currentTimeMillis()-processStartTime); > > > > long storeStartTime=System.currentTimeMillis(); > > store.put(entry.key,storeobj); > > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+( > > System.currentTimeMillis()-storeStartTime); > > } > > counter++; > > } > > logger.info(" Time Metrics for punctuate " > > " for TimeStamp :: " + "" + timestamp + " processed > > Records :: " > > + counter +" totalTimeTakenToProcessRecords :: > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord :: > > "+totalTimeTakenToStoreRecords > > +"toal time Taken to retrieve Records :: "+ > > (System.currentTimeMillis() - > > (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords) > )+" > > Total time Taken :: " + (System.currentTimeMillis() - startTime)); > > } > > } > > >