[GitHub] [flink] wangyang0918 commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-24 Thread GitBox


wangyang0918 commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r756606603



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 flinkConfig.get(JobManagerOptions.PORT));
 }
 
+final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+
+final FlinkPod podTemplate =
+kubernetesJobManagerParameters
+.getPodTemplateFilePath()
+.map(
+file ->
+
KubernetesUtils.loadPodFromTemplateFile(
+client, file, 
Constants.MAIN_CONTAINER_NAME))
+.orElse(new FlinkPod.Builder().build());
+final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+podTemplate, kubernetesJobManagerParameters);
+
+client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+return createClusterClientProvider(clusterId);
+}
+
+private ClusterClientProvider safelyDeployCluster(
+SupplierWithException, Exception> 
supplier)
+throws ClusterDeploymentException {
 try {
-final KubernetesJobManagerParameters 
kubernetesJobManagerParameters =
-new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
-
-final FlinkPod podTemplate =
-kubernetesJobManagerParameters
-.getPodTemplateFilePath()
-.map(
-file ->
-
KubernetesUtils.loadPodFromTemplateFile(
-client, file, 
Constants.MAIN_CONTAINER_NAME))
-.orElse(new FlinkPod.Builder().build());
-final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
-
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
-podTemplate, kubernetesJobManagerParameters);
-
-client.createJobManagerComponent(kubernetesJobManagerSpec);
-
-return createClusterClientProvider(clusterId);
+
+ClusterClientProvider clusterClientProvider = 
supplier.get();
+
+try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {

Review comment:
   Re-create the client probably does not make sense for FLINK-24624 since 
it will always fail due to permission issues.
   
   After careful consideration, I lean to have more discussion and keep the 
current behavior. I still appreciate for @Aitozi 's work on this PR. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-24 Thread GitBox


wangyang0918 commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r755797584



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 flinkConfig.get(JobManagerOptions.PORT));
 }
 
+final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+
+final FlinkPod podTemplate =
+kubernetesJobManagerParameters
+.getPodTemplateFilePath()
+.map(
+file ->
+
KubernetesUtils.loadPodFromTemplateFile(
+client, file, 
Constants.MAIN_CONTAINER_NAME))
+.orElse(new FlinkPod.Builder().build());
+final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+podTemplate, kubernetesJobManagerParameters);
+
+client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+return createClusterClientProvider(clusterId);
+}
+
+private ClusterClientProvider safelyDeployCluster(
+SupplierWithException, Exception> 
supplier)
+throws ClusterDeploymentException {
 try {
-final KubernetesJobManagerParameters 
kubernetesJobManagerParameters =
-new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
-
-final FlinkPod podTemplate =
-kubernetesJobManagerParameters
-.getPodTemplateFilePath()
-.map(
-file ->
-
KubernetesUtils.loadPodFromTemplateFile(
-client, file, 
Constants.MAIN_CONTAINER_NAME))
-.orElse(new FlinkPod.Builder().build());
-final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
-
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
-podTemplate, kubernetesJobManagerParameters);
-
-client.createJobManagerComponent(kubernetesJobManagerSpec);
-
-return createClusterClientProvider(clusterId);
+
+ClusterClientProvider clusterClientProvider = 
supplier.get();
+
+try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {

Review comment:
   @cc13ny Thanks for your valuable comments.
   
   @Aitozi This discussion make me to rethink that whether we really need to 
clean up the K8s resources when creating Flink client failed. Because the Flink 
cluster might be running normally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-21 Thread GitBox


wangyang0918 commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r753895825



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -256,39 +247,50 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 flinkConfig.get(JobManagerOptions.PORT));
 }
 
+final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+
+final FlinkPod podTemplate =
+kubernetesJobManagerParameters
+.getPodTemplateFilePath()
+.map(
+file ->
+
KubernetesUtils.loadPodFromTemplateFile(
+client, file, 
Constants.MAIN_CONTAINER_NAME))
+.orElse(new FlinkPod.Builder().build());
+final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+podTemplate, kubernetesJobManagerParameters);
+
+client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+return createClusterClientProvider(clusterId);
+}
+
+private ClusterClientProvider safelyDeployCluster(
+SupplierWithException, Exception> 
supplier)
+throws ClusterDeploymentException {
 try {
-final KubernetesJobManagerParameters 
kubernetesJobManagerParameters =
-new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
-
-final FlinkPod podTemplate =
-kubernetesJobManagerParameters
-.getPodTemplateFilePath()
-.map(
-file ->
-
KubernetesUtils.loadPodFromTemplateFile(
-client, file, 
Constants.MAIN_CONTAINER_NAME))
-.orElse(new FlinkPod.Builder().build());
-final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
-
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
-podTemplate, kubernetesJobManagerParameters);
-
-client.createJobManagerComponent(kubernetesJobManagerSpec);
-
-return createClusterClientProvider(clusterId);
+
+ClusterClientProvider clusterClientProvider = 
supplier.get();
+
+try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {
+LOG.info(
+"Create flink cluster {} successfully, JobManager Web 
Interface: {}",
+clusterId,
+clusterClient.getWebInterfaceURL());
+}
+return clusterClientProvider;
 } catch (Exception e) {
 try {
-LOG.warn(
-"Failed to create the Kubernetes cluster \"{}\", try 
to clean up the residual resources.",
-clusterId);
 client.stopAndCleanupCluster(clusterId);
-} catch (Exception e1) {
-LOG.info(
+} catch (Exception ex) {
+LOG.warn(
 "Failed to stop and clean up the Kubernetes cluster 
\"{}\".",
 clusterId,
-e1);
+ex);
 }
-throw new ClusterDeploymentException(

Review comment:
   Also here, why you remove the exception message here.

##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -155,19 +156,14 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 @Override
 public ClusterClientProvider deploySessionCluster(
 ClusterSpecification clusterSpecification) throws 
ClusterDeploymentException {
-final ClusterClientProvider clusterClientProvider =
-deployClusterInternal(
-KubernetesSessionClusterEntrypoint.class.getName(),
-clusterSpecification,
-false);
-
-try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {
-LOG.info(
-"Create flink session cluster {} successfully, JobManager 
Web Interface: {}",
-clusterId,
-clusterClient.getWebInterfaceURL());
-}
-return clusterClientProvider;
+final SupplierWithException, Exception> 
supplier =

Review comment:
   Do we really need to have such local 

[GitHub] [flink] wangyang0918 commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-18 Thread GitBox


wangyang0918 commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r752813610



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -205,17 +211,23 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 Preconditions.checkArgument(pipelineJars.size() == 1, "Should only 
have one jar");
 }
 
-final ClusterClientProvider clusterClientProvider =
-deployClusterInternal(
-KubernetesApplicationClusterEntrypoint.class.getName(),
-clusterSpecification,
-false);
+ClusterClientProvider clusterClientProvider;
+try {
+clusterClientProvider =
+deployClusterInternal(
+
KubernetesApplicationClusterEntrypoint.class.getName(),
+clusterSpecification,
+false);
 
-try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {
-LOG.info(
-"Create flink application cluster {} successfully, 
JobManager Web Interface: {}",
-clusterId,
-clusterClient.getWebInterfaceURL());
+try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {
+LOG.info(
+"Create flink application cluster {} successfully, 
JobManager Web Interface: {}",
+clusterId,
+clusterClient.getWebInterfaceURL());
+}
+} catch (Exception e) {

Review comment:
   I am curious whether we could wrap the `try...catch {// clean up 
resources}` in a separate method. Just like following. WDYT?
   
   ```
   private  ClusterClientProvider safelyDeployCluster(
   SupplierWithException, Exception> 
supplier)
   throws ClusterDeploymentException {
   try {
   return supplier.get();
   } catch (Exception e) {
   try {
   LOG.warn(
   "Failed to create the Kubernetes cluster \"{}\", try 
to clean up the residual resources.",
   clusterId);
   client.stopAndCleanupCluster(clusterId);
   } catch (Exception ex) {
   LOG.warn(
   "Failed to stop and clean up the Kubernetes cluster 
\"{}\".", clusterId, e);
   }
   throw new ClusterDeploymentException(e);
   }
   }
   ```

##
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
##
@@ -131,16 +131,14 @@ public void testKillCluster() throws Exception {
 }
 
 @Test
-public void testDeployApplicationCluster() {
+public void testDeployApplicationCluster() throws 
ClusterDeploymentException {
 flinkConfig.set(
 PipelineOptions.JARS, 
Collections.singletonList("local:///path/of/user.jar"));
 flinkConfig.set(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.APPLICATION.getName());
-try {
-descriptor.deployApplicationCluster(clusterSpecification, 
appConfig);
-} catch (Exception ignored) {
-}
 
-mockExpectedServiceFromServerSide(loadBalancerSvc);
+mockFirstEmptyFollowByExpectedServiceFromServerSide(new Service(), 
loadBalancerSvc);

Review comment:
   I like this change. Great.

##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -256,39 +268,35 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 flinkConfig.get(JobManagerOptions.PORT));
 }
 
+final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+
+final FlinkPod podTemplate =
+kubernetesJobManagerParameters
+.getPodTemplateFilePath()
+.map(
+file ->
+
KubernetesUtils.loadPodFromTemplateFile(
+client, file, 
Constants.MAIN_CONTAINER_NAME))
+.orElse(new FlinkPod.Builder().build());
+final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+podTemplate, kubernetesJobManagerParameters);
+
+client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+return createClusterClientProvider(clusterId);
+}
+
+private void killClusterSilently(Throwable