[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r323107972
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -507,7 +507,7 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
keyGroupRange,
executionConfig,
localRecoveryConfig,
-   priorityQueueStateType,
+   getPriorityQueueStateType(priorityQueueStateType),
 
 Review comment:
   `priorityQueueStateType` is a class field, do we actually need to pass it to 
`getPriorityQueueStateType ` if it is already available there?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r323107474
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -716,6 +716,16 @@ public void enableTtlCompactionFilter() {
enableTtlCompactionFilter = TernaryBoolean.TRUE;
}
 
+   /**
+* Gets the PriorityQueueStateType. It will fallback to the default 
value, if it is not explicitly set.
 
 Review comment:
   nit: `Gets the type of the priority queue state. It will ..`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r323108356
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -716,6 +716,16 @@ public void enableTtlCompactionFilter() {
enableTtlCompactionFilter = TernaryBoolean.TRUE;
}
 
+   /**
+* Gets the PriorityQueueStateType. It will fallback to the default 
value, if it is not explicitly set.
+* @param origin
+* @return
 
 Review comment:
   I think empty `@return` doc does not add any value. I would either remove it 
or add:
   `the type of the priority queue state`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-05 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r321239949
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -314,10 +315,12 @@ private RocksDBStateBackend(RocksDBStateBackend 
original, Configuration config,
this.enableTtlCompactionFilter = 
original.enableTtlCompactionFilter

.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));
 
-   final String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
-
-   this.priorityQueueStateType = priorityQueueTypeString.length() 
> 0 ?
-   
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : 
original.priorityQueueStateType;
+   if (original.priorityQueueStateType == 
PriorityQueueStateType.UNDEFINED) {
+   String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
+   this.priorityQueueStateType = 
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase());
 
 Review comment:
   short-cut for getting Enum values is: `config.getEnum(...)` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-05 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r321285268
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ##
 @@ -172,6 +173,40 @@ public void testConfigureTimerService() throws Exception {
keyedBackend.dispose();
}
 
+   /**
+* Validates that user custom configuration from code should override 
the flink-conf.yaml.
+* @throws Exception
+*/
+   @Test
+   public void testConfigureTimerServiceLoadingFromApplication() throws 
Exception {
+   final Environment env = 
getMockEnvironment(tempFolder.newFolder());
+
+   RocksDBStateBackend backend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+   Configuration config = new Configuration();
+   config.setString(
+   RocksDBOptions.TIMER_SERVICE_FACTORY,
+   
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
+   backend = backend.configure(config, 
Thread.currentThread().getContextClassLoader());
+
+   RocksDBKeyedStateBackend keyedBackend = 
createKeyedStateBackend(backend, env);
+
+   Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, 
keyedBackend.getPriorityQueueFactory().getClass());
+
+   Configuration configFromConfFile = new Configuration();
+   configFromConfFile.setString(
+   RocksDBOptions.TIMER_SERVICE_FACTORY,
+   
RocksDBStateBackend.PriorityQueueStateType.HEAP.toString());
+
+   StateBackend loadedBackend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(backend, configFromConfFile,
+   Thread.currentThread().getContextClassLoader(), null);
+
+   assertTrue(loadedBackend instanceof RocksDBStateBackend);
+   final RocksDBStateBackend rocksDBStateBackend = 
(RocksDBStateBackend) loadedBackend;
+
+   RocksDBKeyedStateBackend keyedBackend2 = 
createKeyedStateBackend(rocksDBStateBackend, env);
+   Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, 
keyedBackend2.getPriorityQueueFactory().getClass());
 
 Review comment:
   `keyedBackend(2).dispose();` at the end


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-05 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r321286841
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ##
 @@ -172,6 +173,40 @@ public void testConfigureTimerService() throws Exception {
keyedBackend.dispose();
}
 
+   /**
+* Validates that user custom configuration from code should override 
the flink-conf.yaml.
+* @throws Exception
+*/
+   @Test
+   public void testConfigureTimerServiceLoadingFromApplication() throws 
Exception {
+   final Environment env = 
getMockEnvironment(tempFolder.newFolder());
+
+   RocksDBStateBackend backend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+   Configuration config = new Configuration();
+   config.setString(
+   RocksDBOptions.TIMER_SERVICE_FACTORY,
+   
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
+   backend = backend.configure(config, 
Thread.currentThread().getContextClassLoader());
+
+   RocksDBKeyedStateBackend keyedBackend = 
createKeyedStateBackend(backend, env);
+
+   Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, 
keyedBackend.getPriorityQueueFactory().getClass());
 
 Review comment:
   In these two lines we duplicate check from `testConfigureTimerService`.
   I would suggest we test only one thing in a test at a time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-05 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r321246226
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ##
 @@ -172,6 +173,40 @@ public void testConfigureTimerService() throws Exception {
keyedBackend.dispose();
}
 
+   /**
+* Validates that user custom configuration from code should override 
the flink-conf.yaml.
+* @throws Exception
+*/
+   @Test
+   public void testConfigureTimerServiceLoadingFromApplication() throws 
Exception {
+   final Environment env = 
getMockEnvironment(tempFolder.newFolder());
+
+   RocksDBStateBackend backend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
 Review comment:
   also there should be a test to fail in case of `UNDEFINED` if we choose to 
go with it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-05 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r321244762
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -716,6 +719,11 @@ public void enableTtlCompactionFilter() {
enableTtlCompactionFilter = TernaryBoolean.TRUE;
}
 
+   private PriorityQueueStateType 
resolvePriorityQueueStateType(PriorityQueueStateType origin) {
 
 Review comment:
   I am wondering if it is more user-friendly if we add setter/getter for this 
like we have for some other things.
   we can also use this getter with the resolution logic in 
`createKeyedStateBackend`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-05 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r321243716
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -314,10 +315,12 @@ private RocksDBStateBackend(RocksDBStateBackend 
original, Configuration config,
this.enableTtlCompactionFilter = 
original.enableTtlCompactionFilter

.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));
 
-   final String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
-
-   this.priorityQueueStateType = priorityQueueTypeString.length() 
> 0 ?
-   
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : 
original.priorityQueueStateType;
+   if (original.priorityQueueStateType == 
PriorityQueueStateType.UNDEFINED) {
+   String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
+   this.priorityQueueStateType = 
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase());
 
 Review comment:
   I think introducing `UNDEFINED` makes sense in general but I am wondering if 
making it @Nullable and leaving initialized by `null` is actually simpler in 
this case. If we use `UNDEFINED`, I think we also have to check here explicitly 
and then test it that if user configures `UNDEFINED` then she gets an error at 
once.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-05 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r321273489
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ##
 @@ -172,6 +173,40 @@ public void testConfigureTimerService() throws Exception {
keyedBackend.dispose();
}
 
+   /**
+* Validates that user custom configuration from code should override 
the flink-conf.yaml.
+* @throws Exception
+*/
+   @Test
+   public void testConfigureTimerServiceLoadingFromApplication() throws 
Exception {
+   final Environment env = 
getMockEnvironment(tempFolder.newFolder());
 
 Review comment:
   We try to avoid using mockito mocks in new code. We have e.g. 
`MockEnvironmentBuilder` which we could try here to achieve similar behaviour.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-05 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r321245108
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ##
 @@ -144,7 +145,7 @@ public void testConfigureTimerService() throws Exception {

Assert.assertEquals("state.backend.rocksdb.timer-service.factory", 
RocksDBOptions.TIMER_SERVICE_FACTORY.key());
 
// Fix the option value string and ensure all are covered
-   Assert.assertEquals(2, 
RocksDBStateBackend.PriorityQueueStateType.values().length);
+   Assert.assertEquals(3, 
RocksDBStateBackend.PriorityQueueStateType.values().length);
 
 Review comment:
   also `UNDEFINED` assertion if we choose to go with it
   `Assert.assertEquals("UNDEFINED", 
RocksDBStateBackend.PriorityQueueStateType.UNDEFINED.toString());`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services