This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 60d9b96  [FLINK-11193][State] Fix Rockdb timer service factory 
configuration option to be settable per job
60d9b96 is described below

commit 60d9b96456f142f8d18d5882016840a00159403e
Author: Aitozi <1059789...@qq.com>
AuthorDate: Wed Sep 4 22:59:09 2019 +0800

    [FLINK-11193][State] Fix Rockdb timer service factory configuration option 
to be settable per job
    
    This closes #8479.
---
 .../streaming/state/RocksDBStateBackend.java       | 32 +++++++++++++++-----
 .../state/RocksDBStateBackendConfigTest.java       | 34 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 8 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index c5604e5..cfac3b2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -147,7 +147,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        private TernaryBoolean enableTtlCompactionFilter;
 
        /** This determines the type of priority queue state. */
-       private final PriorityQueueStateType priorityQueueStateType;
+       @Nullable
+       private PriorityQueueStateType priorityQueueStateType;
 
        /** The default rocksdb metrics options. */
        private final RocksDBNativeMetricOptions defaultMetricOptions;
@@ -265,8 +266,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
                this.numberOfTransferingThreads = 
UNDEFINED_NUMBER_OF_TRANSFERING_THREADS;
-               // for now, we use still the heap-based implementation as 
default
-               this.priorityQueueStateType = PriorityQueueStateType.HEAP;
                this.defaultMetricOptions = new RocksDBNativeMetricOptions();
                this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
        }
@@ -314,10 +313,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                this.enableTtlCompactionFilter = 
original.enableTtlCompactionFilter
                        
.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));
 
-               final String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
-
-               this.priorityQueueStateType = priorityQueueTypeString.length() 
> 0 ?
-                       
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : 
original.priorityQueueStateType;
+               if (null == original.priorityQueueStateType) {
+                       this.priorityQueueStateType = 
config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY);
+               } else {
+                       this.priorityQueueStateType = 
original.priorityQueueStateType;
+               }
 
                // configure local directories
                if (original.localRocksDbDirectories != null) {
@@ -507,7 +507,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                        keyGroupRange,
                        executionConfig,
                        localRecoveryConfig,
-                       priorityQueueStateType,
+                       getPriorityQueueStateType(),
                        ttlTimeProvider,
                        metricGroup,
                        stateHandles,
@@ -716,6 +716,22 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                enableTtlCompactionFilter = TernaryBoolean.TRUE;
        }
 
+       /**
+        * Gets the type of the priority queue state. It will fallback to the 
default value, if it is not explicitly set.
+        * @return The type of the priority queue state.
+        */
+       public PriorityQueueStateType getPriorityQueueStateType() {
+               return priorityQueueStateType == null ?
+                       
PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : 
priorityQueueStateType;
+       }
+
+       /**
+        * Sets the type of the priority queue state. It will fallback to the 
default value, if it is not explicitly set.
+        */
+       public void setPriorityQueueStateType(PriorityQueueStateType 
priorityQueueStateType) {
+               this.priorityQueueStateType = 
checkNotNull(priorityQueueStateType);
+       }
+
        // 
------------------------------------------------------------------------
        //  Parametrize with RocksDB Options
        // 
------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index c00ffdb..d4ef2e3 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -60,6 +62,7 @@ import java.io.IOException;
 import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertArrayEquals;
@@ -172,6 +175,37 @@ public class RocksDBStateBackendConfigTest {
                keyedBackend.dispose();
        }
 
+       /**
+        * Validates that user custom configuration from code should override 
the flink-conf.yaml.
+        */
+       @Test
+       public void testConfigureTimerServiceLoadingFromApplication() throws 
Exception {
+               final MockEnvironment env = new 
MockEnvironmentBuilder().build();
+
+               // priorityQueueStateType of the job backend
+               final RocksDBStateBackend backend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+               
backend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.HEAP);
+
+               // priorityQueueStateType in the cluster config
+               final Configuration configFromConfFile = new Configuration();
+               configFromConfFile.setString(
+                       RocksDBOptions.TIMER_SERVICE_FACTORY,
+                       
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
+
+               // configure final backend from job and cluster config
+               final RocksDBStateBackend configuredRocksDBStateBackend = 
backend.configure(
+                       configFromConfFile,
+                       Thread.currentThread().getContextClassLoader());
+               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(configuredRocksDBStateBackend, env);
+
+               // priorityQueueStateType of the job backend should be preserved
+               assertThat(keyedBackend.getPriorityQueueFactory(), 
instanceOf(HeapPriorityQueueSetFactory.class));
+
+               keyedBackend.close();
+               keyedBackend.dispose();
+               env.close();
+       }
+
        @Test
        public void testStoragePathWithFilePrefix() throws Exception {
                final File folder = tempFolder.newFolder();

Reply via email to