[flink] 03/05: [FLINK-11533] [container] Make jobClassName argument optional

2019-03-01 Thread uce
This is an automated email from the ASF dual-hosted git repository.

uce pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8af3250b418b5f47440460a8051aff7ef1a7c054
Author: Ufuk Celebi 
AuthorDate: Mon Feb 25 17:48:50 2019 +0100

[FLINK-11533] [container] Make jobClassName argument optional

[pr-review] Address comments
---
 .../entrypoint/ClassPathJobGraphRetriever.java | 20 +++-
 .../StandaloneJobClusterConfiguration.java | 22 +++---
 ...daloneJobClusterConfigurationParserFactory.java | 14 +++---
 .../entrypoint/StandaloneJobClusterEntryPoint.java | 19 ++-
 .../entrypoint/ClassPathJobGraphRetrieverTest.java |  8 
 ...neJobClusterConfigurationParserFactoryTest.java | 12 +---
 6 files changed, 56 insertions(+), 39 deletions(-)

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
index 5554168..092ef8b 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -29,7 +29,11 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.FlinkException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import static java.util.Objects.requireNonNull;
 
@@ -39,8 +43,10 @@ import static java.util.Objects.requireNonNull;
  */
 class ClassPathJobGraphRetriever implements JobGraphRetriever {
 
-   @Nonnull
-   private final String jobClassName;
+   private static final Logger LOG = 
LoggerFactory.getLogger(ClassPathJobGraphRetriever.class);
+   private static final String JAVA_CLASS_PATH = "java.class.path";
+   private static final String PATH_SEPARATOR = "path.separator";
+   private static final String DEFAULT_PATH_SEPARATOR = ":";
 
@Nonnull
private final JobID jobId;
@@ -51,15 +57,18 @@ class ClassPathJobGraphRetriever implements 
JobGraphRetriever {
@Nonnull
private final String[] programArguments;
 
+   @Nullable
+   private final String jobClassName;
+
ClassPathJobGraphRetriever(
-   @Nonnull String jobClassName,
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
-   @Nonnull String[] programArguments) {
-   this.jobClassName = requireNonNull(jobClassName, 
"jobClassName");
+   @Nonnull String[] programArguments,
+   @Nullable String jobClassName) {
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = 
requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, 
"programArguments");
+   this.jobClassName = jobClassName;
}
 
@Override
@@ -89,4 +98,5 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever 
{
throw new FlinkException("Could not load the provided 
entrypoint class.", e);
}
}
+
 }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 8d28b89..875a7c5 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -35,32 +35,27 @@ import static java.util.Objects.requireNonNull;
 final class StandaloneJobClusterConfiguration extends 
EntrypointClusterConfiguration {
 
@Nonnull
-   private final String jobClassName;
-
-   @Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;
 
@Nonnull
private final JobID jobId;
 
+   @Nullable
+   private final String jobClassName;
+
StandaloneJobClusterConfiguration(
@Nonnull String configDir,
@Nonnull Properties dynamicProperties,
@Nonnull String[] args,
@Nullable String hostname,
int restPort,
-   @Nonnull String jobClassName,
@Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
-   @Nonnull JobID jobId) {
+   @Nonnull JobID 

[flink] 03/05: [FLINK-11533] [container] Make jobClassName argument optional

2019-02-28 Thread uce
This is an automated email from the ASF dual-hosted git repository.

uce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 34aa3f5ac9e5efb4d10680dd885ba1bf2f81b7f6
Author: Ufuk Celebi 
AuthorDate: Mon Feb 25 17:48:50 2019 +0100

[FLINK-11533] [container] Make jobClassName argument optional

[pr-review] Address comments
---
 .../entrypoint/ClassPathJobGraphRetriever.java | 20 +++-
 .../StandaloneJobClusterConfiguration.java | 22 +++---
 ...daloneJobClusterConfigurationParserFactory.java | 14 +++---
 .../entrypoint/StandaloneJobClusterEntryPoint.java | 19 ++-
 .../entrypoint/ClassPathJobGraphRetrieverTest.java |  8 
 ...neJobClusterConfigurationParserFactoryTest.java | 12 +---
 6 files changed, 56 insertions(+), 39 deletions(-)

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
index 5554168..092ef8b 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -29,7 +29,11 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.FlinkException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import static java.util.Objects.requireNonNull;
 
@@ -39,8 +43,10 @@ import static java.util.Objects.requireNonNull;
  */
 class ClassPathJobGraphRetriever implements JobGraphRetriever {
 
-   @Nonnull
-   private final String jobClassName;
+   private static final Logger LOG = 
LoggerFactory.getLogger(ClassPathJobGraphRetriever.class);
+   private static final String JAVA_CLASS_PATH = "java.class.path";
+   private static final String PATH_SEPARATOR = "path.separator";
+   private static final String DEFAULT_PATH_SEPARATOR = ":";
 
@Nonnull
private final JobID jobId;
@@ -51,15 +57,18 @@ class ClassPathJobGraphRetriever implements 
JobGraphRetriever {
@Nonnull
private final String[] programArguments;
 
+   @Nullable
+   private final String jobClassName;
+
ClassPathJobGraphRetriever(
-   @Nonnull String jobClassName,
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
-   @Nonnull String[] programArguments) {
-   this.jobClassName = requireNonNull(jobClassName, 
"jobClassName");
+   @Nonnull String[] programArguments,
+   @Nullable String jobClassName) {
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = 
requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, 
"programArguments");
+   this.jobClassName = jobClassName;
}
 
@Override
@@ -89,4 +98,5 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever 
{
throw new FlinkException("Could not load the provided 
entrypoint class.", e);
}
}
+
 }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 8d28b89..875a7c5 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -35,32 +35,27 @@ import static java.util.Objects.requireNonNull;
 final class StandaloneJobClusterConfiguration extends 
EntrypointClusterConfiguration {
 
@Nonnull
-   private final String jobClassName;
-
-   @Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;
 
@Nonnull
private final JobID jobId;
 
+   @Nullable
+   private final String jobClassName;
+
StandaloneJobClusterConfiguration(
@Nonnull String configDir,
@Nonnull Properties dynamicProperties,
@Nonnull String[] args,
@Nullable String hostname,
int restPort,
-   @Nonnull String jobClassName,
@Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
-   @Nonnull JobID jobId) {
+   @Nonnull JobID