Thanks Yun!,
I used this option, and it greatly helped
2:44 <https://xmcyber.slack.com/archives/DP2KLMWUX/p1600947862026600>
val be = new RocksDBStateBackend("file:///tmp")class MyConfig extends
DefaultConfigurableOptionsFactory { override def
createColumnOptions(currentOptions: ColumnFamilyOptions,
handlesToClose: util.Collection[AutoCloseable]): ColumnFamilyOptions =
{
super.createColumnOptions(currentOptions,
handlesToClose).optimizeForPointLookup(2000)
}
}
be.setRocksDBOptions(new MyConfig)
be.getMemoryConfiguration.setUseManagedMemory(false)
But now I cant use the RocksDBSharedResources because of
setCacheIndexAndFilterBlocks seems to make the hash index not work properly
and the performance is bad again.
Only when using be.getMemoryConfiguration.setUseManagedMemory(false) and
skipping setCacheIndexAndFilterBlocks , only then its working :(
On Fri, Sep 25, 2020 at 9:56 AM Yun Tang <[email protected]> wrote:
> Hi
>
> If you want to improve the performance of point lookup, you could try to
> use additional hash index. This feature needs to pass a prefix extractor,
> however, original interface is not exposed out directly in java API.
>
> You could try to call
> columnFamilyOptions.optimizeForPointLookup(blockCacheSizeMb) and it would
> use NoopTransform prefix extractor by default[1].
> Please also consider to use this feature after Flink-1.10.2 due to RocksDB
> internal bug [2].
>
> [1]
> https://github.com/dataArtisans/frocksdb/blob/c724d41fab7f9f09f9676dfccc6d210a191da4d6/options/options.cc#L477
> [2] https://issues.apache.org/jira/browse/FLINK-17800
>
> Best
> Yun Tang
>
>
> ------------------------------
> *From:* ירון שני <[email protected]>
> *Sent:* Wednesday, September 23, 2020 23:56
> *To:* [email protected] <[email protected]>
> *Subject:* Poor performance with large keys using RocksDB and MapState
>
> Hello,
> I have a poor throughput issue, and I think I managed to reproduce it
> using the following code:
>
> val conf = new Configuration()
> conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 *
> 1000))
> conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 *
> 1000))
> conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
> conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))
>
> val be = new RocksDBStateBackend("file:///tmp")
> val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
> .setStateBackend(be)
>
> env.setParallelism(3)
> env.getConfig.enableObjectReuse()
>
> val r = new scala.util.Random(31)
> val randStr = r.nextString(4992)
> val s = env.fromElements(1).process((value: Int, ctx:
> _root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int,
> _root_.scala.Predef.String]#Context, out:
> _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => {
> for (a <- 1 to 1000 * 1000 * 10) {
> out.collect( randStr + r.nextString(8) )
>
> }
> }).keyBy(a=>a).process(new ProcessFunction[String, String] {
> private var someState: MapState[String, String] = _
>
> override def open(parameters: Configuration): Unit = {
> someState = getRuntimeContext.getMapState(
> new MapStateDescriptor[String, String]("someState",
> createTypeInformation[String], createTypeInformation[String])
> )
> }
>
> override def processElement(value: _root_.scala.Predef.String, ctx:
> _root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String,
> _root_.scala.Predef.String]#Context, out:
> _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
> if(!someState.contains(value)) {
> someState.put(value, value)
> }
> }
> })
>
> env.execute()
>
> This has really poor throughput.
> Now changing
> out.collect( randStr + r.nextString(8) )
>
> to
> out.collect( r.nextString(8) + randStr)
> Solves the issue.
> Is there any way easy to fix this?
> I tried to use hash index, but it required rocks db option called "prefix
> extractor" which I don't know how to fill yet, and no idea if it will fix
> it.
> If anyone encountered that before, I would really use some advice/help.
> Thanks!
>
>
>
>
>
>
>
>