This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push: new 60d9b96 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job 60d9b96 is described below commit 60d9b96456f142f8d18d5882016840a00159403e Author: Aitozi <1059789...@qq.com> AuthorDate: Wed Sep 4 22:59:09 2019 +0800 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job This closes #8479. --- .../streaming/state/RocksDBStateBackend.java | 32 +++++++++++++++----- .../state/RocksDBStateBackendConfigTest.java | 34 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index c5604e5..cfac3b2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -147,7 +147,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu private TernaryBoolean enableTtlCompactionFilter; /** This determines the type of priority queue state. */ - private final PriorityQueueStateType priorityQueueStateType; + @Nullable + private PriorityQueueStateType priorityQueueStateType; /** The default rocksdb metrics options. */ private final RocksDBNativeMetricOptions defaultMetricOptions; @@ -265,8 +266,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; this.numberOfTransferingThreads = UNDEFINED_NUMBER_OF_TRANSFERING_THREADS; - // for now, we use still the heap-based implementation as default - this.priorityQueueStateType = PriorityQueueStateType.HEAP; this.defaultMetricOptions = new RocksDBNativeMetricOptions(); this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED; } @@ -314,10 +313,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.enableTtlCompactionFilter = original.enableTtlCompactionFilter .resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED)); - final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); - - this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ? - PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType; + if (null == original.priorityQueueStateType) { + this.priorityQueueStateType = config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY); + } else { + this.priorityQueueStateType = original.priorityQueueStateType; + } // configure local directories if (original.localRocksDbDirectories != null) { @@ -507,7 +507,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu keyGroupRange, executionConfig, localRecoveryConfig, - priorityQueueStateType, + getPriorityQueueStateType(), ttlTimeProvider, metricGroup, stateHandles, @@ -716,6 +716,22 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu enableTtlCompactionFilter = TernaryBoolean.TRUE; } + /** + * Gets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set. + * @return The type of the priority queue state. + */ + public PriorityQueueStateType getPriorityQueueStateType() { + return priorityQueueStateType == null ? + PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : priorityQueueStateType; + } + + /** + * Sets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set. + */ + public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType) { + this.priorityQueueStateType = checkNotNull(priorityQueueStateType); + } + // ------------------------------------------------------------------------ // Parametrize with RocksDB Options // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index c00ffdb..d4ef2e3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -32,6 +32,8 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; @@ -60,6 +62,7 @@ import java.io.IOException; import java.util.Collections; import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertArrayEquals; @@ -172,6 +175,37 @@ public class RocksDBStateBackendConfigTest { keyedBackend.dispose(); } + /** + * Validates that user custom configuration from code should override the flink-conf.yaml. + */ + @Test + public void testConfigureTimerServiceLoadingFromApplication() throws Exception { + final MockEnvironment env = new MockEnvironmentBuilder().build(); + + // priorityQueueStateType of the job backend + final RocksDBStateBackend backend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); + backend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.HEAP); + + // priorityQueueStateType in the cluster config + final Configuration configFromConfFile = new Configuration(); + configFromConfFile.setString( + RocksDBOptions.TIMER_SERVICE_FACTORY, + RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString()); + + // configure final backend from job and cluster config + final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure( + configFromConfFile, + Thread.currentThread().getContextClassLoader()); + final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env); + + // priorityQueueStateType of the job backend should be preserved + assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class)); + + keyedBackend.close(); + keyedBackend.dispose(); + env.close(); + } + @Test public void testStoragePathWithFilePrefix() throws Exception { final File folder = tempFolder.newFolder();