Hi Sophie,

Thanks for the information! After setting cache.max.bytes.buffering to
zero, commenting out my customized rocksDB config and using default
rocksDB config, I do see the repartition topic bytesout goes up. But I
noticed that some nodes have IO exception as: An I/O error has
occurred while writing a response message entity to the container
output stream.(org.glassfish.jersey.server.internal.process.MappableException:
org.apache.catalina.connector.ClientAbortException:
java.io.IOException: Broken pipe). Is is also related to the rocksDB
read and write? Anything I should do to get rid of this exception?

Thanks a lot!

On Mon, Oct 14, 2019 at 6:10 PM Sophie Blee-Goldman <sop...@confluent.io> wrote:
>
> Ah ok, 2.3.0 has a known performance issue in the caching layer which tends
> to get worse the larger the cache size. That might explain what you're
> seeing with
> regards to the traffic correlation.
>
> It's fixed in 2.3.1 which should be released very soon, but until then you
> might want
> to try turning off the Streams cache (by setting cache.max.bytes.buffering
> to zero,
> although you can also use .withCachingDisabled to turn it off only for the
> large/problem
> store)
>
> Of course any memory you reclaim by turning off the Streams cache can just
> go towards the
> rocksdb cache instead, just note that the rocksdb cache comes from off-heap
> memory
> while the Streams cache would be taken from heap memory allocated to the
> jvm.
>
> On Mon, Oct 14, 2019 at 2:48 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
>
> > Hi Sophie,
> >
> > Thanks for the help!
> > I'm using version 2.3.0.
> >
> > The repartition topic with huge lag is the created during the first
> > reduce method, named "XX-KSTREAM-STATE-STORE-0030-repartition". All
> > other internal topics have almost zero lags. For my case, how could I
> > find out if rocksDB causes the lags? One thing I noticed is, when the
> > source traffic is about 30K/sec, I don't have any lags for the entire
> > system but when the traffic goes up to 100K/sec, it has a huge lag. As
> > you mentioned, if the memory usage is high, should I set any rocksDB
> > memory related config to higher value? Thanks a lot!
> > My topology is like below:
> >
> > final KStream<String, byte[]> source = builder.stream(inputTopic);
> > KStream<String, Event> deserializedStream = source.mapValues( ... });
> >
> > KStream<Windowed<String>, Event> dedupedStream =
> > deserializedStream.selectKey( ... )
> > .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class)))
> > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > .reduce((value1, value2) -> value2)
> > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > .toStream();
> >
> > dedupedStream.selectKey( ... )
> > .mapValues( ... )
> > .filter(...)
> > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
> > .reduce((value1, value2) -> {
> >     long count1 = value1.getCount();
> >     long count2 = value2.getCount();
> >     value2.setCount(count1 + count2);
> >     return value2;
> > }
> > )
> > .toStream()
> > .selectKey( ... )
> > .to(outputTopic);
> >
> > On Mon, Oct 14, 2019 at 3:53 PM Sophie Blee-Goldman <sop...@confluent.io>
> > wrote:
> > >
> > > Out of curiosity, which version are you using?
> > >
> > > There's nothing that really jumps out at me as problematic in your
> > > RocksDBConfigSetter, but note that I think you may need to increase
> > > the number of threads in the "LOW priority thread pool" in addition to
> > > setting the maxBackgroundCompactions -- this can be done as
> > >
> > > options.setEnv(Env.getDefault().setBackgroundThreads(n,
> > > Env.COMPACTION_POOL));
> > >
> > > Is your disk throughput possibly the bottleneck? Note that if the
> > > repartition topic
> > > is followed by a subtopology doing heavy processing this will likely show
> > > up as
> > > lag like you describe. Also, if you have a large number of stateful tasks
> > > (large
> > > number of stateful operations, and/or large number of partitions) each
> > one
> > > will
> > > have its own separate rocksdb instance, and the memory usage could be
> > quite
> > > high (which can cause rocks to page in/out things like index blocks which
> > > always
> > > need to be read before a lookup) -- I'd recommend also setting
> > >
> > > tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
> > >
> > >
> > > On Sun, Oct 13, 2019 at 6:40 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm running a Kafka Streams app with windowing function. I noticed
> > > > that internal topic -repartition has huge lag while the system CPU
> > > > usage is low and app is stable(join rate is almost 0).
> > > >
> > > > The repartition topic is an internal topic and created by the
> > > > application automatically. The bytes in per sec for this topic is
> > > > about 65MB/sec while the bytes out for this topic is only 15MB/sec. I
> > > > have tried a couple configs to customize RocksDB config, but none of
> > > > it could increase the bytes out value.
> > > >
> > > > I changed the default RocksDB block case size to 125MB and block size
> > > > to 125MB as well. Also set the max write buffer number to 3. But it
> > > > didn't help.
> > > >
> > > > May I know what I missed here? What's the best way to find why
> > > > internal repartition topic has huge lags?
> > > >
> > > > Thanks for all the helps!!
> > > >
> > > > My RocksDB config:
> > > > public static class CustomRocksDBConfig implements RocksDBConfigSetter
> > {
> > > >     private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(125 *
> > > > 1024L * 1024L);
> > > >
> > > >     @Override
> > > >     public void setConfig(final String storeName, final Options
> > > > options, final Map<String, Object> configs) {
> > > >         int n = Runtime.getRuntime().availableProcessors();
> > > >         options.setMaxBackgroundCompactions(n);
> > > >         options.setWriteBufferSize(125 * 1024 * 1024);
> > > >         BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > > options.tableFormatConfig();
> > > >         tableConfig.setBlockCache(cache);
> > > >         tableConfig.setBlockCacheSize(125 * 1024 * 1024L);
> > > >         tableConfig.setBlockSize(125 * 1024L);
> > > >         tableConfig.setCacheIndexAndFilterBlocks(true);
> > > >         options.setTableFormatConfig(tableConfig);
> > > >         options.setMaxWriteBufferNumber(3);
> > > >     }
> > > >
> > > >     public void close(final String storeName, final Options options) {
> > > >         // See #5 below.
> > > >         cache.close();
> > > >     }
> > > >
> > > > }
> > > >
> > > > Thanks
> > > > Kathy
> > > >
> >

Reply via email to