Hi Sophie,

Thanks for the help!
I'm using version 2.3.0.

The repartition topic with huge lag is the created during the first
reduce method, named "XX-KSTREAM-STATE-STORE-0030-repartition". All
other internal topics have almost zero lags. For my case, how could I
find out if rocksDB causes the lags? One thing I noticed is, when the
source traffic is about 30K/sec, I don't have any lags for the entire
system but when the traffic goes up to 100K/sec, it has a huge lag. As
you mentioned, if the memory usage is high, should I set any rocksDB
memory related config to higher value? Thanks a lot!
My topology is like below:

final KStream<String, byte[]> source = builder.stream(inputTopic);
KStream<String, Event> deserializedStream = source.mapValues( ... });

KStream<Windowed<String>, Event> dedupedStream =
deserializedStream.selectKey( ... )
.groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class)))
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1, value2) -> value2)
.suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();

dedupedStream.selectKey( ... )
.mapValues( ... )
.filter(...)
.groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
.reduce((value1, value2) -> {
    long count1 = value1.getCount();
    long count2 = value2.getCount();
    value2.setCount(count1 + count2);
    return value2;
}
)
.toStream()
.selectKey( ... )
.to(outputTopic);

On Mon, Oct 14, 2019 at 3:53 PM Sophie Blee-Goldman <sop...@confluent.io> wrote:
>
> Out of curiosity, which version are you using?
>
> There's nothing that really jumps out at me as problematic in your
> RocksDBConfigSetter, but note that I think you may need to increase
> the number of threads in the "LOW priority thread pool" in addition to
> setting the maxBackgroundCompactions -- this can be done as
>
> options.setEnv(Env.getDefault().setBackgroundThreads(n,
> Env.COMPACTION_POOL));
>
> Is your disk throughput possibly the bottleneck? Note that if the
> repartition topic
> is followed by a subtopology doing heavy processing this will likely show
> up as
> lag like you describe. Also, if you have a large number of stateful tasks
> (large
> number of stateful operations, and/or large number of partitions) each one
> will
> have its own separate rocksdb instance, and the memory usage could be quite
> high (which can cause rocks to page in/out things like index blocks which
> always
> need to be read before a lookup) -- I'd recommend also setting
>
> tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
>
>
> On Sun, Oct 13, 2019 at 6:40 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm running a Kafka Streams app with windowing function. I noticed
> > that internal topic -repartition has huge lag while the system CPU
> > usage is low and app is stable(join rate is almost 0).
> >
> > The repartition topic is an internal topic and created by the
> > application automatically. The bytes in per sec for this topic is
> > about 65MB/sec while the bytes out for this topic is only 15MB/sec. I
> > have tried a couple configs to customize RocksDB config, but none of
> > it could increase the bytes out value.
> >
> > I changed the default RocksDB block case size to 125MB and block size
> > to 125MB as well. Also set the max write buffer number to 3. But it
> > didn't help.
> >
> > May I know what I missed here? What's the best way to find why
> > internal repartition topic has huge lags?
> >
> > Thanks for all the helps!!
> >
> > My RocksDB config:
> > public static class CustomRocksDBConfig implements RocksDBConfigSetter {
> >     private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(125 *
> > 1024L * 1024L);
> >
> >     @Override
> >     public void setConfig(final String storeName, final Options
> > options, final Map<String, Object> configs) {
> >         int n = Runtime.getRuntime().availableProcessors();
> >         options.setMaxBackgroundCompactions(n);
> >         options.setWriteBufferSize(125 * 1024 * 1024);
> >         BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > options.tableFormatConfig();
> >         tableConfig.setBlockCache(cache);
> >         tableConfig.setBlockCacheSize(125 * 1024 * 1024L);
> >         tableConfig.setBlockSize(125 * 1024L);
> >         tableConfig.setCacheIndexAndFilterBlocks(true);
> >         options.setTableFormatConfig(tableConfig);
> >         options.setMaxWriteBufferNumber(3);
> >     }
> >
> >     public void close(final String storeName, final Options options) {
> >         // See #5 below.
> >         cache.close();
> >     }
> >
> > }
> >
> > Thanks
> > Kathy
> >

Reply via email to