Have you checked the EBS burst balance on your disks that the streams application is running on?
On 21 September 2017 at 04:28, dev loper <spark...@gmail.com> wrote: > Hi Bill, > > I will repeat my tests with Rocks DB enabled and I will revert to you with > details. I might take 1-2 days to get back to you with details since I am > traveling. But I will try my level best to get it tonight. > > On Mon, Sep 18, 2017 at 5:30 PM, Bill Bejeck <b...@confluent.io> wrote: > > > I'm following up from your other thread as well here. Thanks for the > info > > above, that is helpful. > > > > I think the AWS instance type might be a factor here, but let's do some > > more homework first. > > > > For a next step, we could enable logging for RocksDB so we can observe > the > > performance. > > > > Here is some sample code that will allow logging at the INFO level as > well > > as print out statistics (using RocksDB internal stats) every 15 minutes. > > > > Would you mind reverting your Streams application to use a persistent > store > > again? > > > > Then let it run until you observe the behavior you described before and > if > > you don't mind share the logs with me so we can look them over. Thanks! > > > > import org.apache.kafka.streams.state.RocksDBConfigSetter; > > import org.rocksdb.InfoLogLevel; > > import org.rocksdb.Options; > > > > import java.util.Map; > > > > public class RocksDbLogsConfig implements RocksDBConfigSetter { > > > > @Override > > public void setConfig(String storeName, Options options, Map<String, > > Object> configs) { > > options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL); > > options.createStatistics(); > > options.setStatsDumpPeriodSec(900); > > options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT > LOG > > FILES"); > > } > > } > > > > To use the RocksDbLogsConfig class, you'll need to update your Streams > > configs like so: > > > > props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > > RocksDbLogsConfig.class); > > > > > > > > Thanks > > Bill > > > > On Sat, Sep 16, 2017 at 11:22 PM, dev loper <spark...@gmail.com> wrote: > > > > > Hi Bill. > > > > > > Thank you pointing out, But in actual code I am calling iter.close() in > > the > > > finally block if the iterator is not null. I don't see any issues when > I > > am > > > running it on light traffic. As soon as I switch to production traffic > I > > > start seeing these issues. > > > > > > Below I have provided additional details about our current application. > > If > > > you are looking for specific logs or details , please let me know. I > will > > > get the details captured. > > > > > > In production environment I am receiving 10,000 messages per second. > > There > > > are 36 partitions for the topic and there are around 2500 unique > > entities > > > per partition for which I have to maintain the state. > > > > > > Below I have mentioned the hardware configuration and number of > instances > > > we are using for this solution. Please let me know if hardware is the > > > limiting factor here. We didn't go for higher configuration since the > > load > > > average on these instances were quite low and I could hardly see any > CPU > > > spikes . > > > > > > > > > Kafka Machine Machine Details: - 2 Broker Instances with below > > > Configuration , (Current CPU Usage 2%- 8%) > > > > > > Instance Type : AWS T2 Large > > > Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS > > > > > > Kafka Streams Instance : 3 Kafka Streams Application Instances > (Current > > > CPU Usage 8%- 24%) > > > > > > Instance Type : AWS M4 Large > > > Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated > > EBS > > > bandwidth 450 mbps) > > > > > > > > > > > > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <b...@confluent.io> > wrote: > > > > > > > Hi, > > > > > > > > It's hard to say exactly without a little more information. > > > > > > > > On a side note, I don't see where you are closing the > KeyValueIterator > > in > > > > the code above. Not closing a KeyValueIterator on a Permanent State > > Store > > > > can cause a resource leak over time, so I'd add `iter.close()` right > > > before > > > > your `logger.info` call. It might be worth retrying at that point. > > > > > > > > Thanks, > > > > Bill > > > > > > > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <spark...@gmail.com> > wrote: > > > > > > > > > Hi Kafka Streams Users, > > > > > > > > > > I am trying to improve the performance of Kafka Streams State Store > > > > > Persistent Store. In our application we are using Kafka Streams > > > Processor > > > > > API and using Persistent State Store.. My application when starts > up > > > it > > > > > performing well but over a period of time the performance > > > deteriorated. I > > > > > am computing certain results in computeAnalytics method and this > > method > > > > is > > > > > not taking time at all. This method is being called within both > > process > > > > and > > > > > punctuate and I am storing the updated object back to store. Over > the > > > > > period of time its taking huge time for completing the punctuate > > > process > > > > > and I could see majority of the time is spent in storing the > records > > > and > > > > > Iterating the records. The record size is just 2500 per partition. > I > > am > > > > not > > > > > where I am going wrong and how can I improve the performance. > > > > > > > > > > Below is one such sample log record. > > > > > > > > > > INFO | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - > > Time > > > > > Metrics for punctuate for TimeStamp :: 1505564655878 processed > > Records > > > > :: > > > > > 2109 totalTimeTakenToProcessRecords :: 2 > totalTimeTakenToStoreRecord > > :: > > > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken > :: > > > > 40394 > > > > > > > > > > Below I have given my pseudo code for my processor which exactly > > > > resembles > > > > > the code which I am using in my application. > > > > > > > > > > MyProcessor(){ > > > > > > > > > > process(Key objectkey, Update eventupdate){ > > > > > long timestamp=context.timestamp(); > > > > > AnalyticeObj storeobj=store.get(objectkey); > > > > > > > > > > if( storeobj ===null) > > > > > { > > > > > storeobj=new AnalyticeObj(objectkey, > > eventupdate,timestamp) > > > > > } > > > > > else > > > > > { > > > > > storeobj.update(eventupdate,timestamp) > > > > > } > > > > > storeobj=storeobj.computeAnalytics(); > > > > > > > > > > store.put(objectkey,storeobj); > > > > > context.commit(); > > > > > } > > > > > // Every 5 seconds > > > > > punctuate(long timestamp) > > > > > { > > > > > long startTime = System.currentTimeMillis(); > > > > > long totalTimeTakenToProcessRecords=0; > > > > > long totalTimeTakenToStoreRecords=0; > > > > > long counter=0; > > > > > KeyValueIterator iter=this.visitStore.all(); > > > > > while (iter.hasNext()) { > > > > > KeyValue<Key, AnalyticeObj> entry = iter.next(); > > > > > > > > > > if(AnalyticeObj.hasExpired(timestamp) > > > > > store.remove(entry.key) > > > > > else > > > > > { > > > > > long processStartTime=System.currentTimeMillis(); > > > > > AnalyticeObj storeobj= entry.value.computeAnalytics( > > > timestamp); > > > > > > > > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords > > > > > +(System.currentTimeMillis()-processStartTime); > > > > > > > > > > long storeStartTime=System.currentTimeMillis(); > > > > > store.put(entry.key,storeobj); > > > > > > > > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+( > > > > > System.currentTimeMillis()-storeStartTime); > > > > > } > > > > > counter++; > > > > > } > > > > > logger.info(" Time Metrics for punctuate " > > > > > " for TimeStamp :: " + "" + timestamp + " > > processed > > > > > Records :: " > > > > > + counter +" totalTimeTakenToProcessRecords :: > > > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord :: > > > > > "+totalTimeTakenToStoreRecords > > > > > +"toal time Taken to retrieve Records :: "+ > > > > > (System.currentTimeMillis() - > > > > > (startTime+totalTimeTakenToProcessRecords > > > +totalTimeTakenToStoreRecords) > > > > )+" > > > > > Total time Taken :: " + (System.currentTimeMillis() - startTime)); > > > > > } > > > > > } > > > > > > > > > > > > > > >