[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r323107972 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -507,7 +507,7 @@ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException keyGroupRange, executionConfig, localRecoveryConfig, - priorityQueueStateType, + getPriorityQueueStateType(priorityQueueStateType), Review comment: `priorityQueueStateType` is a class field, do we actually need to pass it to `getPriorityQueueStateType ` if it is already available there? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r323107474 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -716,6 +716,16 @@ public void enableTtlCompactionFilter() { enableTtlCompactionFilter = TernaryBoolean.TRUE; } + /** +* Gets the PriorityQueueStateType. It will fallback to the default value, if it is not explicitly set. Review comment: nit: `Gets the type of the priority queue state. It will ..` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r323108356 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -716,6 +716,16 @@ public void enableTtlCompactionFilter() { enableTtlCompactionFilter = TernaryBoolean.TRUE; } + /** +* Gets the PriorityQueueStateType. It will fallback to the default value, if it is not explicitly set. +* @param origin +* @return Review comment: I think empty `@return` doc does not add any value. I would either remove it or add: `the type of the priority queue state` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r321239949 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -314,10 +315,12 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config, this.enableTtlCompactionFilter = original.enableTtlCompactionFilter .resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED)); - final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); - - this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ? - PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType; + if (original.priorityQueueStateType == PriorityQueueStateType.UNDEFINED) { + String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); + this.priorityQueueStateType = PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()); Review comment: short-cut for getting Enum values is: `config.getEnum(...)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r321285268 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ## @@ -172,6 +173,40 @@ public void testConfigureTimerService() throws Exception { keyedBackend.dispose(); } + /** +* Validates that user custom configuration from code should override the flink-conf.yaml. +* @throws Exception +*/ + @Test + public void testConfigureTimerServiceLoadingFromApplication() throws Exception { + final Environment env = getMockEnvironment(tempFolder.newFolder()); + + RocksDBStateBackend backend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); + Configuration config = new Configuration(); + config.setString( + RocksDBOptions.TIMER_SERVICE_FACTORY, + RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString()); + backend = backend.configure(config, Thread.currentThread().getContextClassLoader()); + + RocksDBKeyedStateBackend keyedBackend = createKeyedStateBackend(backend, env); + + Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass()); + + Configuration configFromConfFile = new Configuration(); + configFromConfFile.setString( + RocksDBOptions.TIMER_SERVICE_FACTORY, + RocksDBStateBackend.PriorityQueueStateType.HEAP.toString()); + + StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(backend, configFromConfFile, + Thread.currentThread().getContextClassLoader(), null); + + assertTrue(loadedBackend instanceof RocksDBStateBackend); + final RocksDBStateBackend rocksDBStateBackend = (RocksDBStateBackend) loadedBackend; + + RocksDBKeyedStateBackend keyedBackend2 = createKeyedStateBackend(rocksDBStateBackend, env); + Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend2.getPriorityQueueFactory().getClass()); Review comment: `keyedBackend(2).dispose();` at the end This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r321286841 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ## @@ -172,6 +173,40 @@ public void testConfigureTimerService() throws Exception { keyedBackend.dispose(); } + /** +* Validates that user custom configuration from code should override the flink-conf.yaml. +* @throws Exception +*/ + @Test + public void testConfigureTimerServiceLoadingFromApplication() throws Exception { + final Environment env = getMockEnvironment(tempFolder.newFolder()); + + RocksDBStateBackend backend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); + Configuration config = new Configuration(); + config.setString( + RocksDBOptions.TIMER_SERVICE_FACTORY, + RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString()); + backend = backend.configure(config, Thread.currentThread().getContextClassLoader()); + + RocksDBKeyedStateBackend keyedBackend = createKeyedStateBackend(backend, env); + + Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass()); Review comment: In these two lines we duplicate check from `testConfigureTimerService`. I would suggest we test only one thing in a test at a time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r321246226 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ## @@ -172,6 +173,40 @@ public void testConfigureTimerService() throws Exception { keyedBackend.dispose(); } + /** +* Validates that user custom configuration from code should override the flink-conf.yaml. +* @throws Exception +*/ + @Test + public void testConfigureTimerServiceLoadingFromApplication() throws Exception { + final Environment env = getMockEnvironment(tempFolder.newFolder()); + + RocksDBStateBackend backend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); Review comment: also there should be a test to fail in case of `UNDEFINED` if we choose to go with it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r321244762 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -716,6 +719,11 @@ public void enableTtlCompactionFilter() { enableTtlCompactionFilter = TernaryBoolean.TRUE; } + private PriorityQueueStateType resolvePriorityQueueStateType(PriorityQueueStateType origin) { Review comment: I am wondering if it is more user-friendly if we add setter/getter for this like we have for some other things. we can also use this getter with the resolution logic in `createKeyedStateBackend`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r321243716 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -314,10 +315,12 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config, this.enableTtlCompactionFilter = original.enableTtlCompactionFilter .resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED)); - final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); - - this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ? - PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType; + if (original.priorityQueueStateType == PriorityQueueStateType.UNDEFINED) { + String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); + this.priorityQueueStateType = PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()); Review comment: I think introducing `UNDEFINED` makes sense in general but I am wondering if making it @Nullable and leaving initialized by `null` is actually simpler in this case. If we use `UNDEFINED`, I think we also have to check here explicitly and then test it that if user configures `UNDEFINED` then she gets an error at once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r321273489 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ## @@ -172,6 +173,40 @@ public void testConfigureTimerService() throws Exception { keyedBackend.dispose(); } + /** +* Validates that user custom configuration from code should override the flink-conf.yaml. +* @throws Exception +*/ + @Test + public void testConfigureTimerServiceLoadingFromApplication() throws Exception { + final Environment env = getMockEnvironment(tempFolder.newFolder()); Review comment: We try to avoid using mockito mocks in new code. We have e.g. `MockEnvironmentBuilder` which we could try here to achieve similar behaviour. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#discussion_r321245108 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ## @@ -144,7 +145,7 @@ public void testConfigureTimerService() throws Exception { Assert.assertEquals("state.backend.rocksdb.timer-service.factory", RocksDBOptions.TIMER_SERVICE_FACTORY.key()); // Fix the option value string and ensure all are covered - Assert.assertEquals(2, RocksDBStateBackend.PriorityQueueStateType.values().length); + Assert.assertEquals(3, RocksDBStateBackend.PriorityQueueStateType.values().length); Review comment: also `UNDEFINED` assertion if we choose to go with it `Assert.assertEquals("UNDEFINED", RocksDBStateBackend.PriorityQueueStateType.UNDEFINED.toString());` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services