Hi,

It's hard to say exactly without a little more information.

On a side note, I don't see where you are closing the KeyValueIterator in
the code above. Not closing a KeyValueIterator on a Permanent State Store
can cause a resource leak over time, so I'd add `iter.close()` right before
your `logger.info` call.  It might be worth retrying at that point.

Thanks,
Bill

On Sat, Sep 16, 2017 at 9:14 AM, dev loper <spark...@gmail.com> wrote:

> Hi Kafka Streams Users,
>
> I am trying to improve the performance of Kafka Streams State Store
> Persistent Store. In our application we are using Kafka Streams Processor
> API  and using Persistent State Store.. My application when starts up it
> performing well but over a period of time the performance deteriorated. I
> am computing certain results in computeAnalytics method and this method is
> not taking time at all. This method is being called within both process and
> punctuate and I am storing the updated object back to store. Over the
> period of time its taking huge time for completing the punctuate process
> and I could see majority of the time is spent in storing the records and
> Iterating the records. The record size is just 2500 per partition. I am not
> where I am going wrong and how can I improve the performance.
>
> Below is one such sample log record.
>
> INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records ::
> 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394
>
> Below I have given my pseudo code for my processor which exactly resembles
> the code which I am using in my application.
>
> MyProcessor(){
>
>   process(Key objectkey, Update eventupdate){
>    long timestamp=context.timestamp();
>    AnalyticeObj storeobj=store.get(objectkey);
>
>    if( storeobj ===null)
>          {
>           storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
>          }
>          else
>         {
>            storeobj.update(eventupdate,timestamp)
>         }
>      storeobj=storeobj.computeAnalytics();
>
>    store.put(objectkey,storeobj);
>   context.commit();
> }
> // Every 5 seconds
> punctuate(long timestamp)
> {
>  long startTime = System.currentTimeMillis();
> long totalTimeTakenToProcessRecords=0;
> long totalTimeTakenToStoreRecords=0;
> long counter=0;
> KeyValueIterator iter=this.visitStore.all();
> while (iter.hasNext()) {
> KeyValue<Key, AnalyticeObj> entry = iter.next();
>
>     if(AnalyticeObj.hasExpired(timestamp)
>          store.remove(entry.key)
>       else
>       {
>         long processStartTime=System.currentTimeMillis();
>          AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
>
> totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> +(System.currentTimeMillis()-processStartTime);
>
>          long storeStartTime=System.currentTimeMillis();
>           store.put(entry.key,storeobj);
>
> totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> System.currentTimeMillis()-storeStartTime);
>        }
>    counter++;
> }
>      logger.info(" Time Metrics for punctuate  "
>                     " for TimeStamp :: " + "" + timestamp + " processed
> Records :: "
>                     + counter +" totalTimeTakenToProcessRecords ::
> "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> "+totalTimeTakenToStoreRecords
>                     +"toal time Taken to retrieve Records :: "+
> (System.currentTimeMillis() -
> (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+"
> Total time Taken :: " + (System.currentTimeMillis() - startTime));
> }
> }
>

Reply via email to