hi,

@Override
public UV get(UK userKey) throws IOException, RocksDBException {
   byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,
userKeySerializer);
   byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

   return (rawValueBytes == null ? null :
deserializeUserValue(dataInputView, rawValueBytes,
userValueSerializer));
}

@Override
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

   byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,
userKeySerializer);
   byte[] rawValueBytes = serializeValueNullSensitive(userValue,
userValueSerializer);

   backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
}

通过源码跟踪发现,RocksDBMapState每次get和put都需要序列化和反序列化。。。应该是这个原因导致比较耗时。

Peihui He <[email protected]> 于2020年11月5日周四 上午11:05写道:

> hi
>
> 我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。
>
> private void bufferEvent(IN event, long currentTime) throws Exception {
>     long currentTs = System.currentTimeMillis();
>     List<IN> elementsForTimestamp =  elementQueueState.get(currentTime);
>     if (elementsForTimestamp == null) {
>         this.bufferEventGetNullhistogram.update(System.currentTimeMillis() - 
> currentTs);
>         elementsForTimestamp = new ArrayList<>();
>     }else {
>         
> this.bufferEventGethistogram.update(System.currentTimeMillis()-currentTs);
>     }
>     elementsForTimestamp.add(event);
>     long secondCurrentTs = System.currentTimeMillis();
>     elementQueueState.put(currentTime, elementsForTimestamp);
>     this.bufferEventPuthistogram.update(System.currentTimeMillis() - 
> secondCurrentTs);
>     this.bufferEventhistogram.update(System.currentTimeMillis() - currentTs);
> }
>
> 通过复写CepOperator,加入了一些metics发现
>
> this.bufferEventhistogram = metrics.histogram("buffer_event_delay", new
>         DescriptiveStatisticsHistogram(1000));
> this.bufferEventGethistogram = metrics.histogram("buffer_event_get_delay", new
>         DescriptiveStatisticsHistogram(1000));
> this.bufferEventGetNullhistogram = 
> metrics.histogram("buffer_event_get_null_delay", new
>         DescriptiveStatisticsHistogram(1000));
> this.bufferEventPuthistogram = metrics.histogram("buffer_event_put_delay", new
>         DescriptiveStatisticsHistogram(1000));
>
> 在get和put比较耗时,整个bufferEvent 能达到200ms
> 从rocksdb的metric来看没有进行太多flush和compaction。
>
> [image: image.png]
> [image: image.png]
>
> 也参考了https://www.jianshu.com/p/2e61c2c83c57这篇文章调优过,发现效果也不是很好,一样反压。
> 也看过类似的问题http://apache-flink.147419.n8.nabble.com/rocksDB-td1785.html
> ,但是我这sst文件很小。
> 请教大家,为啥get和put这么耗时呢?有什么好的优化方案不?谢谢。
>
> Best Wishes.
>
>
>
> Best Wishes.
>
>

回复