[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-25 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r328023916
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoaderTest.java
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+/**
+ * Unit tests for {@link RestartBackoffTimeStrategyFactoryLoader}.
+ */
+public class RestartBackoffTimeStrategyFactoryLoaderTest extends TestLogger {
+
+   private static final RestartStrategies.RestartStrategyConfiguration 
DEFAULT_RESTART_STRATEGY_CONFIGURATION =
 
 Review comment:
   Maybe `DEFAULT_JOB_LEVEL_RESTART_CONFIGURATION`. If you don't want to change 
it, it's also fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-25 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r328023492
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ * It respects the configs for the legacy {@link RestartStrategy}.
+ */
+public final class RestartBackoffTimeStrategyFactoryLoader {
+
+   private RestartBackoffTimeStrategyFactoryLoader() {
+   }
+
+   /**
+* Creates {@link RestartBackoffTimeStrategy.Factory} from the given 
configuration.
+*
+* The strategy factory is decided in order as follows:
+* 
+* Strategy set within job graph, i.e. {@link 
RestartStrategies.RestartStrategyConfiguration},
+* unless the config is {@link 
RestartStrategies.FallbackRestartStrategyConfiguration}.
+* Strategy set in the cluster(server-side) config 
(flink-conf.yaml),
+* unless the strategy is not specified
+* {@link 
FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory}
 if
+* checkpointing is enabled. Otherwise {@link 
NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory}
+* 
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration
+* @param isCheckpointingEnabled if checkpointing is enabled for the job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   return 
getJobRestartStrategyFactory(jobRestartStrategyConfiguration)
+   
.orElse(getClusterRestartStrategyFactory(clusterConfiguration)
+   
.orElse(getDefaultRestartStrategyFactory(isCheckpointingEnabled)));
+   }
+
+   private static Optional 
getJobRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
+
+   if (restartStrategyConfiguration instanceof 
RestartStrategies.NoRestartStrategyConfiguration) {
+   return 
Optional.of(NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory.INSTANCE);
+   } else if (restartStrategyConfiguration instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration) {
+   final 
RestartStrategies.FixedDelayRestartStrategyConfiguration fixedDelayConfig =
+   
(RestartStrategies.FixedDelayRestartStrategyConfiguration) 
restartStrategyConfiguration;
+
+   return Optional.of(new 
FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(
+   fixedDelayConfig.getRestartAttempts(),
+   
fixedDelayConfig.getDelayBetweenAttemptsInterval().toMilliseconds()));
+   

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-25 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r328023089
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ * It respects the configs for the legacy {@link RestartStrategy}.
+ */
+public final class RestartBackoffTimeStrategyFactoryLoader {
+
+   private RestartBackoffTimeStrategyFactoryLoader() {
+   }
+
+   /**
+* Creates {@link RestartBackoffTimeStrategy.Factory} from the given 
configuration.
+*
+* The strategy factory is decided in order as follows:
+* 
+* Strategy set within job graph, i.e. {@link 
RestartStrategies.RestartStrategyConfiguration},
+* unless the config is {@link 
RestartStrategies.FallbackRestartStrategyConfiguration}.
+* Strategy set in the cluster(server-side) config 
(flink-conf.yaml),
+* unless the strategy is not specified
+* {@link 
FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory}
 if
+* checkpointing is enabled. Otherwise {@link 
NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory}
+* 
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration
+* @param isCheckpointingEnabled if checkpointing is enabled for the job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   return 
getJobRestartStrategyFactory(jobRestartStrategyConfiguration)
+   
.orElse(getClusterRestartStrategyFactory(clusterConfiguration)
+   
.orElse(getDefaultRestartStrategyFactory(isCheckpointingEnabled)));
+   }
+
+   private static Optional 
getJobRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
+
+   if (restartStrategyConfiguration instanceof 
RestartStrategies.NoRestartStrategyConfiguration) {
+   return 
Optional.of(NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory.INSTANCE);
+   } else if (restartStrategyConfiguration instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration) {
+   final 
RestartStrategies.FixedDelayRestartStrategyConfiguration fixedDelayConfig =
 
 Review comment:
   I think this branch and the one before is not tested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-25 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r328023154
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ * It respects the configs for the legacy {@link RestartStrategy}.
+ */
+public final class RestartBackoffTimeStrategyFactoryLoader {
+
+   private RestartBackoffTimeStrategyFactoryLoader() {
+   }
+
+   /**
+* Creates {@link RestartBackoffTimeStrategy.Factory} from the given 
configuration.
+*
+* The strategy factory is decided in order as follows:
+* 
+* Strategy set within job graph, i.e. {@link 
RestartStrategies.RestartStrategyConfiguration},
+* unless the config is {@link 
RestartStrategies.FallbackRestartStrategyConfiguration}.
+* Strategy set in the cluster(server-side) config 
(flink-conf.yaml),
+* unless the strategy is not specified
+* {@link 
FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory}
 if
+* checkpointing is enabled. Otherwise {@link 
NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory}
+* 
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration
+* @param isCheckpointingEnabled if checkpointing is enabled for the job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   return 
getJobRestartStrategyFactory(jobRestartStrategyConfiguration)
+   
.orElse(getClusterRestartStrategyFactory(clusterConfiguration)
+   
.orElse(getDefaultRestartStrategyFactory(isCheckpointingEnabled)));
+   }
+
+   private static Optional 
getJobRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
+
+   if (restartStrategyConfiguration instanceof 
RestartStrategies.NoRestartStrategyConfiguration) {
+   return 
Optional.of(NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory.INSTANCE);
+   } else if (restartStrategyConfiguration instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration) {
+   final 
RestartStrategies.FixedDelayRestartStrategyConfiguration fixedDelayConfig =
+   
(RestartStrategies.FixedDelayRestartStrategyConfiguration) 
restartStrategyConfiguration;
+
+   return Optional.of(new 
FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(
+   fixedDelayConfig.getRestartAttempts(),
+   
fixedDelayConfig.getDelayBetweenAttemptsInterval().toMilliseconds()));
+   

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-25 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r328022763
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ * It respects the configs for the legacy {@link RestartStrategy}.
+ */
+public final class RestartBackoffTimeStrategyFactoryLoader {
+
+   private RestartBackoffTimeStrategyFactoryLoader() {
+   }
+
+   /**
+* Creates {@link RestartBackoffTimeStrategy.Factory} from the given 
configuration.
+*
+* The strategy factory is decided in order as follows:
+* 
+* Strategy set within job graph, i.e. {@link 
RestartStrategies.RestartStrategyConfiguration},
+* unless the config is {@link 
RestartStrategies.FallbackRestartStrategyConfiguration}.
+* Strategy set in the cluster(server-side) config 
(flink-conf.yaml),
+* unless the strategy is not specified
+* {@link 
FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory}
 if
+* checkpointing is enabled. Otherwise {@link 
NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory}
+* 
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration
+* @param isCheckpointingEnabled if checkpointing is enabled for the job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   return 
getJobRestartStrategyFactory(jobRestartStrategyConfiguration)
+   
.orElse(getClusterRestartStrategyFactory(clusterConfiguration)
+   
.orElse(getDefaultRestartStrategyFactory(isCheckpointingEnabled)));
+   }
+
+   private static Optional 
getJobRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
+
+   if (restartStrategyConfiguration instanceof 
RestartStrategies.NoRestartStrategyConfiguration) {
 
 Review comment:
   I would add the following imports:
   
   ```
   import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.FailureRateRestartStrategyConfiguration;
   import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.FallbackRestartStrategyConfiguration;
   import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration;
   import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.NoRestartStrategyConfiguration;
   ```
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this 

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-25 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r328021824
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FixedDelayRestartBackoffTimeStrategy.java
 ##
 @@ -57,6 +58,11 @@ public long getBackoffTime() {
return backoffTimeMS;
}
 
+   @VisibleForTesting
+   int getMaxNumberRestartAttempts() {
 
 Review comment:
   I think it's reasonable to make this part of the public API, i.e., remove 
`@VisibleForTesting` and add `public`. Generally speaking, if a value is 
exposed by `toString()`, the value should be accessible by a getter or other 
methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-19 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r326166365
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ * It respects the configs for the legacy {@link RestartStrategy}.
+ */
+public final class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   private RestartBackoffTimeStrategyFactoryLoader() {
+   }
+
+   /**
+* Creates a {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   RestartStrategyOptions.RESTART_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new version restart strategy directly if it 
is specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs for new 
version restart strategy
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new version restart strategy from the adapted 
config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   private static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactoryInternal(
+   final Configuration configuration) {
+
+   final String restartStrategyClassName = configuration.getString(
+   

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-19 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r326166365
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ * It respects the configs for the legacy {@link RestartStrategy}.
+ */
+public final class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   private RestartBackoffTimeStrategyFactoryLoader() {
+   }
+
+   /**
+* Creates a {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   RestartStrategyOptions.RESTART_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new version restart strategy directly if it 
is specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs for new 
version restart strategy
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new version restart strategy from the adapted 
config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   private static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactoryInternal(
+   final Configuration configuration) {
+
+   final String restartStrategyClassName = configuration.getString(
+   

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-19 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r326162784
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ */
+public class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   /**
+* Creates proper {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) throws Exception {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   
RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new restart strategy directly if it is 
specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs as new 
restart strategy configs
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new restart strategy from the adapted config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   /**
+* Decides the {@link RestartBackoffTimeStrategy} to use and its params 
based on legacy configs,
+* and records its class name and the params into a adapted 
configuration.
+*
+* The decision making is as follows:
+* 
+* Use strategy of {@link 

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-19 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r326166365
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ * It respects the configs for the legacy {@link RestartStrategy}.
+ */
+public final class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   private RestartBackoffTimeStrategyFactoryLoader() {
+   }
+
+   /**
+* Creates a {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   RestartStrategyOptions.RESTART_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new version restart strategy directly if it 
is specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs for new 
version restart strategy
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new version restart strategy from the adapted 
config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   private static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactoryInternal(
+   final Configuration configuration) {
+
+   final String restartStrategyClassName = configuration.getString(
+   

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-12 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r323601246
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ */
+public class RestartBackoffTimeStrategyFactoryLoader {
 
 Review comment:
   Since there are static members only, I'd make this class final with a 
private constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-12 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r323600583
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ */
+public class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   /**
+* Creates proper {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) throws Exception {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   
RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new restart strategy directly if it is 
specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs as new 
restart strategy configs
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new restart strategy from the adapted config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   /**
+* Decides the {@link RestartBackoffTimeStrategy} to use and its params 
based on legacy configs,
+* and records its class name and the params into a adapted 
configuration.
+*
+* The decision making is as follows:
+* 
+* Use strategy of {@link 

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-12 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r323195327
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java
 ##
 @@ -25,6 +25,16 @@
  */
 @PublicEvolving
 public class RestartBackoffTimeStrategyOptions {
+
+   /**
+* Class name of the RestartBackoffTimeStrategy implementation to use.
+*/
+   @PublicEvolving
+   public static final ConfigOption 
RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME = ConfigOptions
+   .key("restart-backoff-time-strategy.class-name")
 
 Review comment:
   With the introduction of the new scheduler, users will not be able to use 
custom implementations of the `RestartStrategy` interface. Strictly speaking 
this is a breaking change. However, it is not clear to me if there are any 
Flink users at all that require a custom `RestartStrategy`. I think it makes 
sense to start a discussion on the dev mailing list. Until it is clear that 
users require this level of customization, I think we should not provide users 
a way to write their own back off time strategy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r323199864
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ */
+public class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   /**
+* Creates proper {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) throws Exception {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   
RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new restart strategy directly if it is 
specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs as new 
restart strategy configs
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new restart strategy from the adapted config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   /**
+* Decides the {@link RestartBackoffTimeStrategy} to use and its params 
based on legacy configs,
+* and records its class name and the params into a adapted 
configuration.
+*
+* The decision making is as follows:
+* 
+* Use strategy of {@link 

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r323199864
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ */
+public class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   /**
+* Creates proper {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) throws Exception {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   
RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new restart strategy directly if it is 
specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs as new 
restart strategy configs
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new restart strategy from the adapted config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   /**
+* Decides the {@link RestartBackoffTimeStrategy} to use and its params 
based on legacy configs,
+* and records its class name and the params into a adapted 
configuration.
+*
+* The decision making is as follows:
+* 
+* Use strategy of {@link 

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r323197187
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java
 ##
 @@ -46,7 +57,7 @@
.withDescription("Time interval in milliseconds for measuring 
failure rate.");
 
/**
-* Backoff time between two consecutive restart attempts in 
FailureRateRestartBackoffTimeStrategy.
+* Backoff time (milli-seconds) between two consecutive restart 
attempts in FailureRateRestartBackoffTimeStrategy.
 */
@PublicEvolving
public static final ConfigOption 
RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_BACKOFF_TIME = 
ConfigOptions
 
 Review comment:
   I know this discussion is out of scope for this PR but do we need new config 
keys? For backwards compatibility we should respect the [existing config 
keys](https://github.com/apache/flink/blob/0a405251b297109fde1f9a155eff14be4d943887/flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java)
 anyways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r323195327
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java
 ##
 @@ -25,6 +25,16 @@
  */
 @PublicEvolving
 public class RestartBackoffTimeStrategyOptions {
+
+   /**
+* Class name of the RestartBackoffTimeStrategy implementation to use.
+*/
+   @PublicEvolving
+   public static final ConfigOption 
RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME = ConfigOptions
+   .key("restart-backoff-time-strategy.class-name")
 
 Review comment:
   With the introduction of the new scheduler, users will not be able to use 
custom implementations of the `RestartStrategy` interface. Strictly speaking 
this is a breaking change. However, it is not clear to me if there are any 
Flink users at all that require a custom `RestartStrategy`. I think it makes 
sense to start a discussion on the dev mailing list. Until it is clear that 
users require this level of customization, I think we should provide users a 
way to write their own back off time strategy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r322764167
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ */
+public class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   /**
+* Creates proper {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) throws Exception {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   
RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new restart strategy directly if it is 
specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs as new 
restart strategy configs
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new restart strategy from the adapted config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   /**
+* Decides the {@link RestartBackoffTimeStrategy} to use and its params 
based on legacy configs,
+* and records its class name and the params into a adapted 
configuration.
+*
+* The decision making is as follows:
+* 
+* Use strategy of {@link 

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r322781627
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoaderTest.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+/**
+ * Unit tests for {@link RestartBackoffTimeStrategyFactoryLoader}.
+ */
+public class RestartBackoffTimeStrategyFactoryLoaderTest extends TestLogger {
+
+   private static final RestartStrategies.RestartStrategyConfiguration 
DEFAULT_RESTART_STRATEGY_CONFIGURATION =
+   new RestartStrategies.FallbackRestartStrategyConfiguration();
+
+   @Test
+   public void testNewStrategySpecified() throws Exception {
+   // specify RestartBackoffTimeStrategy directly in cluster config
+   final Configuration conf = new Configuration();
+   conf.setString(
+   
RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME,
+   TestRestartBackoffTimeStrategy.class.getName());
+
+   // the RestartStrategyConfiguration should not take effect as 
the loader will
+   // directly create the factory from the config of the new 
version strategy
+   final RestartBackoffTimeStrategy.Factory factory =
+   
RestartBackoffTimeStrategyFactoryLoader.createRestartStrategyFactory(
+   new 
RestartStrategies.FailureRateRestartStrategyConfiguration(
+   1,
+   Time.milliseconds(1000),
+   Time.milliseconds(1000)),
+   conf,
+   true);
+
+   assertThat(
+   factory,
+   
instanceOf(TestRestartBackoffTimeStrategy.TestRestartBackoffTimeStrategyFactory.class));
+   }
+
+   @Test
+   public void testInvalidNewStrategySpecified() throws Exception {
+   final Configuration conf = new Configuration();
+   conf.setString(
+   
RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME,
+   InvalidTestRestartBackoffTimeStrategy.class.getName());
+
+   final RestartBackoffTimeStrategy.Factory factory =
+   
RestartBackoffTimeStrategyFactoryLoader.createRestartStrategyFactory(
+   DEFAULT_RESTART_STRATEGY_CONFIGURATION,
+   conf,
+   true);
+
+   assertThat(
+   factory,
+   
instanceOf(NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory.class));
+   }
+
+   @Test
+   public void testNoStrategySpecifiedWhenCheckpointingEnabled() throws 
Exception {
+   final RestartBackoffTimeStrategy.Factory factory =
+   
RestartBackoffTimeStrategyFactoryLoader.createRestartStrategyFactory(
+   DEFAULT_RESTART_STRATEGY_CONFIGURATION,
+   new Configuration(),
+   true);
+
+   assertThat(
+   factory,
+   
instanceOf(FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory.class));
+   }
+
+   @Test
+   public void 

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r323197187
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java
 ##
 @@ -46,7 +57,7 @@
.withDescription("Time interval in milliseconds for measuring 
failure rate.");
 
/**
-* Backoff time between two consecutive restart attempts in 
FailureRateRestartBackoffTimeStrategy.
+* Backoff time (milli-seconds) between two consecutive restart 
attempts in FailureRateRestartBackoffTimeStrategy.
 */
@PublicEvolving
public static final ConfigOption 
RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_BACKOFF_TIME = 
ConfigOptions
 
 Review comment:
   I know this discussion is out of scope for this PR but do we need new config 
keys? For backwards compatibility we should respect the existing [config 
keys](https://github.com/apache/flink/blob/0a405251b297109fde1f9a155eff14be4d943887/flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java)
 anyways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r322763247
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ */
+public class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   /**
+* Creates proper {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) throws Exception {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   
RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new restart strategy directly if it is 
specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs as new 
restart strategy configs
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new restart strategy from the adapted config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   /**
+* Decides the {@link RestartBackoffTimeStrategy} to use and its params 
based on legacy configs,
+* and records its class name and the params into a adapted 
configuration.
+*
+* The decision making is as follows:
+* 
+* Use strategy of {@link 

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r322765647
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the 
configuration.
+ */
+public class RestartBackoffTimeStrategyFactoryLoader {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class);
+
+   private static final String CREATE_METHOD = "createFactory";
+
+   /**
+* Creates proper {@link RestartBackoffTimeStrategy.Factory}.
+* If new version restart strategy is specified, will directly use it.
+* Otherwise will decide based on legacy restart strategy configs.
+*
+* @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
+* @param clusterConfiguration cluster(server-side) configuration, 
usually represented as jobmanager config
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return new version restart strategy factory
+*/
+   public static RestartBackoffTimeStrategy.Factory 
createRestartStrategyFactory(
+   final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
+   final Configuration clusterConfiguration,
+   final boolean isCheckpointingEnabled) throws Exception {
+
+   checkNotNull(jobRestartStrategyConfiguration);
+   checkNotNull(clusterConfiguration);
+
+   final String restartStrategyClassName = 
clusterConfiguration.getString(
+   
RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME);
+
+   if (restartStrategyClassName != null) {
+   // create new restart strategy directly if it is 
specified in cluster config
+   return 
createRestartStrategyFactoryInternal(clusterConfiguration);
+   } else {
+   // adapt the legacy restart strategy configs as new 
restart strategy configs
+   final Configuration adaptedConfiguration = 
getAdaptedConfiguration(
+   jobRestartStrategyConfiguration,
+   clusterConfiguration,
+   isCheckpointingEnabled);
+
+   // create new restart strategy from the adapted config
+   return 
createRestartStrategyFactoryInternal(adaptedConfiguration);
+   }
+   }
+
+   /**
+* Decides the {@link RestartBackoffTimeStrategy} to use and its params 
based on legacy configs,
+* and records its class name and the params into a adapted 
configuration.
+*
+* The decision making is as follows:
+* 
+* Use strategy of {@link 

[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-09-11 Thread GitBox
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] 
Implement new generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#discussion_r323195327
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java
 ##
 @@ -25,6 +25,16 @@
  */
 @PublicEvolving
 public class RestartBackoffTimeStrategyOptions {
+
+   /**
+* Class name of the RestartBackoffTimeStrategy implementation to use.
+*/
+   @PublicEvolving
+   public static final ConfigOption 
RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME = ConfigOptions
+   .key("restart-backoff-time-strategy.class-name")
 
 Review comment:
   With the introduction of the new scheduler, users will not be able to use 
custom implementations of the `RestartStrategy` interface. Strictly speaking 
this is a breaking change. However, it is not clear to me if there are any 
Flink users at all that require a custom `RestartStrategy`. I think it makes 
sense to start a discussion on the dev mailing list. Until it is clear that 
users require this level of customization, we should not offer users to write 
their own back off time strategy for now. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services