Hi Ted,

I am using  0.11.0.0 . I am not using external state store.  I am using the
persistent store that comes with kafka stream 0.11.0.0.
My assumption is put just inserts my object into the state store. I guess
the state store is manged by rocksdb internally by kafkastreams.

I am not sure why the put is taking time over  period of time.


On Sat, Sep 16, 2017 at 7:07 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Looking at the calculation of totalTimeTakenToStoreRecords, it covers
> store.put()
> call.
>
> Can you tell us more about what the put() does ?
> Does it involve external key value store ?
>
> Are you using 0.11.0.0 ?
>
> Thanks
>
> On Sat, Sep 16, 2017 at 6:14 AM, dev loper <spark...@gmail.com> wrote:
>
> > Hi Kafka Streams Users,
> >
> > I am trying to improve the performance of Kafka Streams State Store
> > Persistent Store. In our application we are using Kafka Streams Processor
> > API  and using Persistent State Store.. My application when starts up it
> > performing well but over a period of time the performance deteriorated. I
> > am computing certain results in computeAnalytics method and this method
> is
> > not taking time at all. This method is being called within both process
> and
> > punctuate and I am storing the updated object back to store. Over the
> > period of time its taking huge time for completing the punctuate process
> > and I could see majority of the time is spent in storing the records and
> > Iterating the records. The record size is just 2500 per partition. I am
> not
> > where I am going wrong and how can I improve the performance.
> >
> > Below is one such sample log record.
> >
> > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> > Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records
> ::
> > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> 40394
> >
> > Below I have given my pseudo code for my processor which exactly
> resembles
> > the code which I am using in my application.
> >
> > MyProcessor(){
> >
> >   process(Key objectkey, Update eventupdate){
> >    long timestamp=context.timestamp();
> >    AnalyticeObj storeobj=store.get(objectkey);
> >
> >    if( storeobj ===null)
> >          {
> >           storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
> >          }
> >          else
> >         {
> >            storeobj.update(eventupdate,timestamp)
> >         }
> >      storeobj=storeobj.computeAnalytics();
> >
> >    store.put(objectkey,storeobj);
> >   context.commit();
> > }
> > // Every 5 seconds
> > punctuate(long timestamp)
> > {
> >  long startTime = System.currentTimeMillis();
> > long totalTimeTakenToProcessRecords=0;
> > long totalTimeTakenToStoreRecords=0;
> > long counter=0;
> > KeyValueIterator iter=this.visitStore.all();
> > while (iter.hasNext()) {
> > KeyValue<Key, AnalyticeObj> entry = iter.next();
> >
> >     if(AnalyticeObj.hasExpired(timestamp)
> >          store.remove(entry.key)
> >       else
> >       {
> >         long processStartTime=System.currentTimeMillis();
> >          AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
> >
> > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > +(System.currentTimeMillis()-processStartTime);
> >
> >          long storeStartTime=System.currentTimeMillis();
> >           store.put(entry.key,storeobj);
> >
> > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > System.currentTimeMillis()-storeStartTime);
> >        }
> >    counter++;
> > }
> >      logger.info(" Time Metrics for punctuate  "
> >                     " for TimeStamp :: " + "" + timestamp + " processed
> > Records :: "
> >                     + counter +" totalTimeTakenToProcessRecords ::
> > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > "+totalTimeTakenToStoreRecords
> >                     +"toal time Taken to retrieve Records :: "+
> > (System.currentTimeMillis() -
> > (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords)
> )+"
> > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > }
> > }
> >
>

Reply via email to