[GitHub] [flink] wangyang0918 commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed
wangyang0918 commented on a change in pull request #17554: URL: https://github.com/apache/flink/pull/17554#discussion_r756606603 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ## @@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio flinkConfig.get(JobManagerOptions.PORT)); } +final KubernetesJobManagerParameters kubernetesJobManagerParameters = +new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + +final FlinkPod podTemplate = +kubernetesJobManagerParameters +.getPodTemplateFilePath() +.map( +file -> + KubernetesUtils.loadPodFromTemplateFile( +client, file, Constants.MAIN_CONTAINER_NAME)) +.orElse(new FlinkPod.Builder().build()); +final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( +podTemplate, kubernetesJobManagerParameters); + +client.createJobManagerComponent(kubernetesJobManagerSpec); + +return createClusterClientProvider(clusterId); +} + +private ClusterClientProvider safelyDeployCluster( +SupplierWithException, Exception> supplier) +throws ClusterDeploymentException { try { -final KubernetesJobManagerParameters kubernetesJobManagerParameters = -new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); - -final FlinkPod podTemplate = -kubernetesJobManagerParameters -.getPodTemplateFilePath() -.map( -file -> - KubernetesUtils.loadPodFromTemplateFile( -client, file, Constants.MAIN_CONTAINER_NAME)) -.orElse(new FlinkPod.Builder().build()); -final KubernetesJobManagerSpecification kubernetesJobManagerSpec = - KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( -podTemplate, kubernetesJobManagerParameters); - -client.createJobManagerComponent(kubernetesJobManagerSpec); - -return createClusterClientProvider(clusterId); + +ClusterClientProvider clusterClientProvider = supplier.get(); + +try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { Review comment: Re-create the client probably does not make sense for FLINK-24624 since it will always fail due to permission issues. After careful consideration, I lean to have more discussion and keep the current behavior. I still appreciate for @Aitozi 's work on this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed
wangyang0918 commented on a change in pull request #17554: URL: https://github.com/apache/flink/pull/17554#discussion_r755797584 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ## @@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio flinkConfig.get(JobManagerOptions.PORT)); } +final KubernetesJobManagerParameters kubernetesJobManagerParameters = +new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + +final FlinkPod podTemplate = +kubernetesJobManagerParameters +.getPodTemplateFilePath() +.map( +file -> + KubernetesUtils.loadPodFromTemplateFile( +client, file, Constants.MAIN_CONTAINER_NAME)) +.orElse(new FlinkPod.Builder().build()); +final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( +podTemplate, kubernetesJobManagerParameters); + +client.createJobManagerComponent(kubernetesJobManagerSpec); + +return createClusterClientProvider(clusterId); +} + +private ClusterClientProvider safelyDeployCluster( +SupplierWithException, Exception> supplier) +throws ClusterDeploymentException { try { -final KubernetesJobManagerParameters kubernetesJobManagerParameters = -new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); - -final FlinkPod podTemplate = -kubernetesJobManagerParameters -.getPodTemplateFilePath() -.map( -file -> - KubernetesUtils.loadPodFromTemplateFile( -client, file, Constants.MAIN_CONTAINER_NAME)) -.orElse(new FlinkPod.Builder().build()); -final KubernetesJobManagerSpecification kubernetesJobManagerSpec = - KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( -podTemplate, kubernetesJobManagerParameters); - -client.createJobManagerComponent(kubernetesJobManagerSpec); - -return createClusterClientProvider(clusterId); + +ClusterClientProvider clusterClientProvider = supplier.get(); + +try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { Review comment: @cc13ny Thanks for your valuable comments. @Aitozi This discussion make me to rethink that whether we really need to clean up the K8s resources when creating Flink client failed. Because the Flink cluster might be running normally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed
wangyang0918 commented on a change in pull request #17554: URL: https://github.com/apache/flink/pull/17554#discussion_r753895825 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ## @@ -256,39 +247,50 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio flinkConfig.get(JobManagerOptions.PORT)); } +final KubernetesJobManagerParameters kubernetesJobManagerParameters = +new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + +final FlinkPod podTemplate = +kubernetesJobManagerParameters +.getPodTemplateFilePath() +.map( +file -> + KubernetesUtils.loadPodFromTemplateFile( +client, file, Constants.MAIN_CONTAINER_NAME)) +.orElse(new FlinkPod.Builder().build()); +final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( +podTemplate, kubernetesJobManagerParameters); + +client.createJobManagerComponent(kubernetesJobManagerSpec); + +return createClusterClientProvider(clusterId); +} + +private ClusterClientProvider safelyDeployCluster( +SupplierWithException, Exception> supplier) +throws ClusterDeploymentException { try { -final KubernetesJobManagerParameters kubernetesJobManagerParameters = -new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); - -final FlinkPod podTemplate = -kubernetesJobManagerParameters -.getPodTemplateFilePath() -.map( -file -> - KubernetesUtils.loadPodFromTemplateFile( -client, file, Constants.MAIN_CONTAINER_NAME)) -.orElse(new FlinkPod.Builder().build()); -final KubernetesJobManagerSpecification kubernetesJobManagerSpec = - KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( -podTemplate, kubernetesJobManagerParameters); - -client.createJobManagerComponent(kubernetesJobManagerSpec); - -return createClusterClientProvider(clusterId); + +ClusterClientProvider clusterClientProvider = supplier.get(); + +try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { +LOG.info( +"Create flink cluster {} successfully, JobManager Web Interface: {}", +clusterId, +clusterClient.getWebInterfaceURL()); +} +return clusterClientProvider; } catch (Exception e) { try { -LOG.warn( -"Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", -clusterId); client.stopAndCleanupCluster(clusterId); -} catch (Exception e1) { -LOG.info( +} catch (Exception ex) { +LOG.warn( "Failed to stop and clean up the Kubernetes cluster \"{}\".", clusterId, -e1); +ex); } -throw new ClusterDeploymentException( Review comment: Also here, why you remove the exception message here. ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ## @@ -155,19 +156,14 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio @Override public ClusterClientProvider deploySessionCluster( ClusterSpecification clusterSpecification) throws ClusterDeploymentException { -final ClusterClientProvider clusterClientProvider = -deployClusterInternal( -KubernetesSessionClusterEntrypoint.class.getName(), -clusterSpecification, -false); - -try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { -LOG.info( -"Create flink session cluster {} successfully, JobManager Web Interface: {}", -clusterId, -clusterClient.getWebInterfaceURL()); -} -return clusterClientProvider; +final SupplierWithException, Exception> supplier = Review comment: Do we really need to have such local
[GitHub] [flink] wangyang0918 commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed
wangyang0918 commented on a change in pull request #17554: URL: https://github.com/apache/flink/pull/17554#discussion_r752813610 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ## @@ -205,17 +211,23 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); } -final ClusterClientProvider clusterClientProvider = -deployClusterInternal( -KubernetesApplicationClusterEntrypoint.class.getName(), -clusterSpecification, -false); +ClusterClientProvider clusterClientProvider; +try { +clusterClientProvider = +deployClusterInternal( + KubernetesApplicationClusterEntrypoint.class.getName(), +clusterSpecification, +false); -try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { -LOG.info( -"Create flink application cluster {} successfully, JobManager Web Interface: {}", -clusterId, -clusterClient.getWebInterfaceURL()); +try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { +LOG.info( +"Create flink application cluster {} successfully, JobManager Web Interface: {}", +clusterId, +clusterClient.getWebInterfaceURL()); +} +} catch (Exception e) { Review comment: I am curious whether we could wrap the `try...catch {// clean up resources}` in a separate method. Just like following. WDYT? ``` private ClusterClientProvider safelyDeployCluster( SupplierWithException, Exception> supplier) throws ClusterDeploymentException { try { return supplier.get(); } catch (Exception e) { try { LOG.warn( "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", clusterId); client.stopAndCleanupCluster(clusterId); } catch (Exception ex) { LOG.warn( "Failed to stop and clean up the Kubernetes cluster \"{}\".", clusterId, e); } throw new ClusterDeploymentException(e); } } ``` ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java ## @@ -131,16 +131,14 @@ public void testKillCluster() throws Exception { } @Test -public void testDeployApplicationCluster() { +public void testDeployApplicationCluster() throws ClusterDeploymentException { flinkConfig.set( PipelineOptions.JARS, Collections.singletonList("local:///path/of/user.jar")); flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName()); -try { -descriptor.deployApplicationCluster(clusterSpecification, appConfig); -} catch (Exception ignored) { -} -mockExpectedServiceFromServerSide(loadBalancerSvc); +mockFirstEmptyFollowByExpectedServiceFromServerSide(new Service(), loadBalancerSvc); Review comment: I like this change. Great. ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ## @@ -256,39 +268,35 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio flinkConfig.get(JobManagerOptions.PORT)); } +final KubernetesJobManagerParameters kubernetesJobManagerParameters = +new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + +final FlinkPod podTemplate = +kubernetesJobManagerParameters +.getPodTemplateFilePath() +.map( +file -> + KubernetesUtils.loadPodFromTemplateFile( +client, file, Constants.MAIN_CONTAINER_NAME)) +.orElse(new FlinkPod.Builder().build()); +final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( +podTemplate, kubernetesJobManagerParameters); + +client.createJobManagerComponent(kubernetesJobManagerSpec); + +return createClusterClientProvider(clusterId); +} + +private void killClusterSilently(Throwable