Hey Yu,
1. Memory and other configuration
There's not much configuration going on, it's all in the Java class WordCount.
Specifically, memory-related there's this one:
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
I quickly tried that commenting out that line doesn't seem to change anything.
2. I'm not sure what would / should I look for.
For 'taskmanager.memory.managed.fraction' I tried
configuration.setDouble("taskmanager.memory.managed.fraction", 0.8);
But using debugger, I don't see that variable being used. Maybe it's not used
in StreamExecutionEnvironment.createLocalEnvironment?
3. There are no timers, so I don't setting this parameter matters. Anyways, I
tried this:
configuration.setString(RocksDBOptions.TIMER_SERVICE_FACTORY, "HEAP");
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM, configuration);
No changes in the performance (tried with parallelism 5 and without managed
memory).
Regards,
Juha
________________________________
From: Yu Li <[email protected]>
Sent: Thursday, June 25, 2020 12:20 PM
To: Andrey Zagrebin <[email protected]>
Cc: Juha Mynttinen <[email protected]>; Yun Tang <[email protected]>; user
<[email protected]>
Subject: Re: Performance issue associated with managed RocksDB memory
Thanks for the ping Andrey.
Hi Juha,
Thanks for reporting the issue. I'd like to check the below things before
further digging into it:
1. Could you let us know your configurations (especially memory related ones)
when running the tests?
2. Did you watch the memory consumption before / after turning
`state.backend.rocksdb.memory.managed` off? If not, could you check it out and
let us know the result?
2.1 Furthermore, if the memory consumption is much higher when turning
managed memory off, could you try tuning up the managed memory fraction
accordingly through `taskmanager.memory.managed.fraction` [1] and check the
result?
3. With `state.backend.rocksdb.memory.managed` on and nothing else changed,
could you try to set `state.backend.rocksdb.timer-service.factory` to `HEAP`
and check out the result? (side note: starting from 1.10.0 release timers are
stored in RocksDB by default when using RocksDBStateBackend [2])
What's more, you may find these documents [3] [4] useful for memory tunings of
RocksDB backend.
Thanks.
Best Regards,
Yu
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-managed-fraction
[ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_config.html-23taskmanager-2Dmemory-2Dmanaged-2Dfraction&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=nWe2oPjCOeQgnztiDmXO2zE-8n3GoWKCMtDwsoammZ4&e=>
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#state
[ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_release-2Dnotes_flink-2D1.10.html-23state&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=vBem5cU31p97UhrmB0aDezh-6qJu3uHXu-HXLtWAb04&e=>
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#tuning-rocksdb-memory
[ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_state_large-5Fstate-5Ftuning.html-23tuning-2Drocksdb-2Dmemory&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=2kTtbYqWKZXmZjr-cqdgMFUmSD2jSmAZ_mWmYX7QVLA&e=>
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#memory-management
[ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_state_state-5Fbackends.html-23memory-2Dmanagement&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=sNygzPzz1UEAGL8rSHJMA002su5zHtwCj-b3antyhoY&e=>
On Thu, 25 Jun 2020 at 15:37, Andrey Zagrebin
<[email protected]<mailto:[email protected]>> wrote:
Hi Juha,
Thanks for sharing the testing program to expose the problem.
This indeed looks suboptimal if X does not leave space for the window operator.
I am adding Yu and Yun who might have a better idea about what could be
improved about sharing the RocksDB memory among operators.
Best,
Andrey
On Thu, Jun 25, 2020 at 9:10 AM Juha Mynttinen
<[email protected]<mailto:[email protected]>> wrote:
Hey,
Here's a simple test. It's basically the WordCount example from Flink, but
using RocksDB as the state backend and having a stateful operator. The
javadocs explain how to use it.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
[apache.org]<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.apache.org_licenses_LICENSE-2D2.0&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=CF2iW5teWqX33CzhRXJ9b8OTwWDFLWfTTWSiWqtoaio&e=>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.wordcount;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.nio.file.Files;
import java.nio.file.Path;
/**
* Works fast in the following cases.
* <ul>
* <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li>
* <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link
#PARALLELISM} is 1 to 4.</li>
* </ul>
* <p>
* Some results:
* </p>
* <ul>
* <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li>
* <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li>
* <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li>
* <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li>
* <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li>
* <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li>
* </ul>
* <p>
*/
public class WordCount {
/**
* The parallelism of the job.
*/
private static final int PARALLELISM = 5;
/**
* Whether to use managed memory. True, no changes in the config.
* False, managed memory is disabled.
*/
private static final boolean USE_MANAGED_MEMORY = true;
/**
* The source synthesizes this many events.
*/
public static final int EVENT_COUNT = 1_000_000;
/**
* The value of each event is {@code EVENT_COUNT % MAX_VALUE}.
* Essentially controls the count of unique keys.
*/
public static final int MAX_VALUE = 1_000;
//
*************************************************************************
// PROGRAM
//
*************************************************************************
public static void main(String[] args) throws Exception {
// Checking input parameters
final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
// set up the execution environment
Configuration configuration = new Configuration();
if (!USE_MANAGED_MEMORY) {
configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY,
USE_MANAGED_MEMORY);
}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM,
configuration);
Path tempDirPath = Files.createTempDirectory("example");
String checkpointDataUri = "file://%22 + tempDirPath.toString();
RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend(checkpointDataUri, true);
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
env.setStateBackend((StateBackend) rocksDBStateBackend);
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataStream<Long> text = env.addSource(new ExampleCountSource());
text.keyBy(v -> v)
.flatMap(new ValuesCounter())
.addSink(new DiscardingSink<>());
long before = System.currentTimeMillis();
env.execute("Streaming WordCount");
long duration = System.currentTimeMillis() - before;
System.out.println("Done " + duration + " ms, parallelism " +
PARALLELISM);
}
//
*************************************************************************
// USER FUNCTIONS
//
*************************************************************************
private static class ValuesCounter extends RichFlatMapFunction<Long,
Tuple2<Long, Long>> {
private ValueState<Long> state;
@Override
public void flatMap(Long value, Collector<Tuple2<Long, Long>>
out) throws Exception {
Long oldCount = state.value();
if (oldCount == null) {
oldCount = 0L;
}
long newCount = oldCount + 1;
state.update(newCount);
out.collect(Tuple2.of(value, newCount));
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Long> descriptor = new
ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO);
state = getRuntimeContext().getState(descriptor);
}
}
public static class ExampleCountSource implements SourceFunction<Long>,
CheckpointedFunction {
private long count = 0L;
private volatile boolean isRunning = true;
private transient ListState<Long> checkpointedCount;
public void run(SourceContext<Long> ctx) {
while (isRunning && count < EVENT_COUNT) {
// this synchronized block ensures that state checkpointing,
// internal state updates and emission of elements are an
atomic operation
synchronized (ctx.getCheckpointLock()) {
ctx.collect(count % MAX_VALUE);
count++;
}
}
}
public void cancel() {
isRunning = false;
}
public void initializeState(FunctionInitializationContext context)
throws Exception {
this.checkpointedCount = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("count",
Long.class));
if (context.isRestored()) {
for (Long count : this.checkpointedCount.get()) {
this.count = count;
}
}
}
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
this.checkpointedCount.clear();
this.checkpointedCount.add(count);
}
}
}
Regards,
Juha
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
[apache-flink-user-mailing-list-archive.2336050.n4.nabble.com]<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=IAemjhbmFdh9Wqn9tixCSS_w5wJ0HoRyyF9Hl05vTm4&e=>