Hi Sophie, Thanks for the help! I'm using version 2.3.0.
The repartition topic with huge lag is the created during the first reduce method, named "XX-KSTREAM-STATE-STORE-0030-repartition". All other internal topics have almost zero lags. For my case, how could I find out if rocksDB causes the lags? One thing I noticed is, when the source traffic is about 30K/sec, I don't have any lags for the entire system but when the traffic goes up to 100K/sec, it has a huge lag. As you mentioned, if the memory usage is high, should I set any rocksDB memory related config to higher value? Thanks a lot! My topology is like below: final KStream<String, byte[]> source = builder.stream(inputTopic); KStream<String, Event> deserializedStream = source.mapValues( ... }); KStream<Windowed<String>, Event> dedupedStream = deserializedStream.selectKey( ... ) .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class))) .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) .reduce((value1, value2) -> value2) .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream(); dedupedStream.selectKey( ... ) .mapValues( ... ) .filter(...) .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde())) .reduce((value1, value2) -> { long count1 = value1.getCount(); long count2 = value2.getCount(); value2.setCount(count1 + count2); return value2; } ) .toStream() .selectKey( ... ) .to(outputTopic); On Mon, Oct 14, 2019 at 3:53 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > > Out of curiosity, which version are you using? > > There's nothing that really jumps out at me as problematic in your > RocksDBConfigSetter, but note that I think you may need to increase > the number of threads in the "LOW priority thread pool" in addition to > setting the maxBackgroundCompactions -- this can be done as > > options.setEnv(Env.getDefault().setBackgroundThreads(n, > Env.COMPACTION_POOL)); > > Is your disk throughput possibly the bottleneck? Note that if the > repartition topic > is followed by a subtopology doing heavy processing this will likely show > up as > lag like you describe. Also, if you have a large number of stateful tasks > (large > number of stateful operations, and/or large number of partitions) each one > will > have its own separate rocksdb instance, and the memory usage could be quite > high (which can cause rocks to page in/out things like index blocks which > always > need to be read before a lookup) -- I'd recommend also setting > > tableConfig.setPinL0FilterAndIndexBlocksInCache(true); > > > On Sun, Oct 13, 2019 at 6:40 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > Hi, > > > > I'm running a Kafka Streams app with windowing function. I noticed > > that internal topic -repartition has huge lag while the system CPU > > usage is low and app is stable(join rate is almost 0). > > > > The repartition topic is an internal topic and created by the > > application automatically. The bytes in per sec for this topic is > > about 65MB/sec while the bytes out for this topic is only 15MB/sec. I > > have tried a couple configs to customize RocksDB config, but none of > > it could increase the bytes out value. > > > > I changed the default RocksDB block case size to 125MB and block size > > to 125MB as well. Also set the max write buffer number to 3. But it > > didn't help. > > > > May I know what I missed here? What's the best way to find why > > internal repartition topic has huge lags? > > > > Thanks for all the helps!! > > > > My RocksDB config: > > public static class CustomRocksDBConfig implements RocksDBConfigSetter { > > private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(125 * > > 1024L * 1024L); > > > > @Override > > public void setConfig(final String storeName, final Options > > options, final Map<String, Object> configs) { > > int n = Runtime.getRuntime().availableProcessors(); > > options.setMaxBackgroundCompactions(n); > > options.setWriteBufferSize(125 * 1024 * 1024); > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) > > options.tableFormatConfig(); > > tableConfig.setBlockCache(cache); > > tableConfig.setBlockCacheSize(125 * 1024 * 1024L); > > tableConfig.setBlockSize(125 * 1024L); > > tableConfig.setCacheIndexAndFilterBlocks(true); > > options.setTableFormatConfig(tableConfig); > > options.setMaxWriteBufferNumber(3); > > } > > > > public void close(final String storeName, final Options options) { > > // See #5 below. > > cache.close(); > > } > > > > } > > > > Thanks > > Kathy > >