Hi Sophie, Thanks for the information! After setting cache.max.bytes.buffering to zero, commenting out my customized rocksDB config and using default rocksDB config, I do see the repartition topic bytesout goes up. But I noticed that some nodes have IO exception as: An I/O error has occurred while writing a response message entity to the container output stream.(org.glassfish.jersey.server.internal.process.MappableException: org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe). Is is also related to the rocksDB read and write? Anything I should do to get rid of this exception?
Thanks a lot! On Mon, Oct 14, 2019 at 6:10 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > > Ah ok, 2.3.0 has a known performance issue in the caching layer which tends > to get worse the larger the cache size. That might explain what you're > seeing with > regards to the traffic correlation. > > It's fixed in 2.3.1 which should be released very soon, but until then you > might want > to try turning off the Streams cache (by setting cache.max.bytes.buffering > to zero, > although you can also use .withCachingDisabled to turn it off only for the > large/problem > store) > > Of course any memory you reclaim by turning off the Streams cache can just > go towards the > rocksdb cache instead, just note that the rocksdb cache comes from off-heap > memory > while the Streams cache would be taken from heap memory allocated to the > jvm. > > On Mon, Oct 14, 2019 at 2:48 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > Hi Sophie, > > > > Thanks for the help! > > I'm using version 2.3.0. > > > > The repartition topic with huge lag is the created during the first > > reduce method, named "XX-KSTREAM-STATE-STORE-0030-repartition". All > > other internal topics have almost zero lags. For my case, how could I > > find out if rocksDB causes the lags? One thing I noticed is, when the > > source traffic is about 30K/sec, I don't have any lags for the entire > > system but when the traffic goes up to 100K/sec, it has a huge lag. As > > you mentioned, if the memory usage is high, should I set any rocksDB > > memory related config to higher value? Thanks a lot! > > My topology is like below: > > > > final KStream<String, byte[]> source = builder.stream(inputTopic); > > KStream<String, Event> deserializedStream = source.mapValues( ... }); > > > > KStream<Windowed<String>, Event> dedupedStream = > > deserializedStream.selectKey( ... ) > > .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class))) > > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) > > .reduce((value1, value2) -> value2) > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > .toStream(); > > > > dedupedStream.selectKey( ... ) > > .mapValues( ... ) > > .filter(...) > > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde())) > > .reduce((value1, value2) -> { > > long count1 = value1.getCount(); > > long count2 = value2.getCount(); > > value2.setCount(count1 + count2); > > return value2; > > } > > ) > > .toStream() > > .selectKey( ... ) > > .to(outputTopic); > > > > On Mon, Oct 14, 2019 at 3:53 PM Sophie Blee-Goldman <sop...@confluent.io> > > wrote: > > > > > > Out of curiosity, which version are you using? > > > > > > There's nothing that really jumps out at me as problematic in your > > > RocksDBConfigSetter, but note that I think you may need to increase > > > the number of threads in the "LOW priority thread pool" in addition to > > > setting the maxBackgroundCompactions -- this can be done as > > > > > > options.setEnv(Env.getDefault().setBackgroundThreads(n, > > > Env.COMPACTION_POOL)); > > > > > > Is your disk throughput possibly the bottleneck? Note that if the > > > repartition topic > > > is followed by a subtopology doing heavy processing this will likely show > > > up as > > > lag like you describe. Also, if you have a large number of stateful tasks > > > (large > > > number of stateful operations, and/or large number of partitions) each > > one > > > will > > > have its own separate rocksdb instance, and the memory usage could be > > quite > > > high (which can cause rocks to page in/out things like index blocks which > > > always > > > need to be read before a lookup) -- I'd recommend also setting > > > > > > tableConfig.setPinL0FilterAndIndexBlocksInCache(true); > > > > > > > > > On Sun, Oct 13, 2019 at 6:40 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > > > > > Hi, > > > > > > > > I'm running a Kafka Streams app with windowing function. I noticed > > > > that internal topic -repartition has huge lag while the system CPU > > > > usage is low and app is stable(join rate is almost 0). > > > > > > > > The repartition topic is an internal topic and created by the > > > > application automatically. The bytes in per sec for this topic is > > > > about 65MB/sec while the bytes out for this topic is only 15MB/sec. I > > > > have tried a couple configs to customize RocksDB config, but none of > > > > it could increase the bytes out value. > > > > > > > > I changed the default RocksDB block case size to 125MB and block size > > > > to 125MB as well. Also set the max write buffer number to 3. But it > > > > didn't help. > > > > > > > > May I know what I missed here? What's the best way to find why > > > > internal repartition topic has huge lags? > > > > > > > > Thanks for all the helps!! > > > > > > > > My RocksDB config: > > > > public static class CustomRocksDBConfig implements RocksDBConfigSetter > > { > > > > private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(125 * > > > > 1024L * 1024L); > > > > > > > > @Override > > > > public void setConfig(final String storeName, final Options > > > > options, final Map<String, Object> configs) { > > > > int n = Runtime.getRuntime().availableProcessors(); > > > > options.setMaxBackgroundCompactions(n); > > > > options.setWriteBufferSize(125 * 1024 * 1024); > > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) > > > > options.tableFormatConfig(); > > > > tableConfig.setBlockCache(cache); > > > > tableConfig.setBlockCacheSize(125 * 1024 * 1024L); > > > > tableConfig.setBlockSize(125 * 1024L); > > > > tableConfig.setCacheIndexAndFilterBlocks(true); > > > > options.setTableFormatConfig(tableConfig); > > > > options.setMaxWriteBufferNumber(3); > > > > } > > > > > > > > public void close(final String storeName, final Options options) { > > > > // See #5 below. > > > > cache.close(); > > > > } > > > > > > > > } > > > > > > > > Thanks > > > > Kathy > > > > > >