Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-04-20 Thread via GitHub


mbalassi merged PR #24303:
URL: https://github.com/apache/flink/pull/24303


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-04-18 Thread via GitHub


ferenc-csaky commented on PR #24303:
URL: https://github.com/apache/flink/pull/24303#issuecomment-2065736703

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-13 Thread via GitHub


ferenc-csaky commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1487552348


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploader.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Default {@link KubernetesArtifactUploader} implementation. */
+public class DefaultKubernetesArtifactUploader implements 
KubernetesArtifactUploader {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(DefaultKubernetesArtifactUploader.class);
+
+@Override
+public void uploadAll(Configuration config) throws Exception {
+if (!config.get(KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED)) {
+LOG.info(
+"Local artifact uploading is disabled. Set '{}' to 
enable.",
+KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED.key());
+return;
+}
+
+final String jobUri = upload(config, getJobUri(config));
+config.set(PipelineOptions.JARS, Collections.singletonList(jobUri));
+
+final List additionalUris =
+config.getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+.orElse(Collections.emptyList());
+
+final List uploadedAdditionalUris =
+additionalUris.stream()
+.map(
+FunctionUtils.uncheckedFunction(
+artifactUri -> upload(config, 
artifactUri)))
+.collect(Collectors.toList());
+
+config.set(ArtifactFetchOptions.ARTIFACT_LIST, uploadedAdditionalUris);
+}
+
+@VisibleForTesting
+String upload(Configuration config, String artifactUriStr)
+throws IOException, URISyntaxException {
+URI artifactUri = PackagedProgramUtils.resolveURI(artifactUriStr);
+if (!"local".equals(artifactUri.getScheme())) {
+return artifactUriStr;
+}
+
+final String targetDir = 
config.get(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
+checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(targetDir),
+String.format(
+"Setting '%s' to a valid remote path is required.",
+KubernetesConfigOptions.LOCAL_UPLOAD_TARGET.key()));
+
+final File src = new File(artifactUri.getPath());
+final Path target = new Path(targetDir, src.getName());
+if (target.getFileSystem().exists(target)) {
+LOG.debug("Skipping artifact '{}', as it already exists.", target);

Review Comment:
   Yep, I agree, your summary seems correct to me. I'll proceed to add an 
override option for the local upload.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-13 Thread via GitHub


schevalley2 commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1487540063


##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java:
##
@@ -86,7 +86,8 @@ public FlinkKubeClient fromConfiguration(
 
server.createClient().inNamespace(NAMESPACE),
 Executors.newDirectExecutorService());
 }
-});
+},
+config -> {});

Review Comment:
   make sense, thank you for the insight



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-13 Thread via GitHub


schevalley2 commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1487538828


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploader.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Default {@link KubernetesArtifactUploader} implementation. */
+public class DefaultKubernetesArtifactUploader implements 
KubernetesArtifactUploader {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(DefaultKubernetesArtifactUploader.class);
+
+@Override
+public void uploadAll(Configuration config) throws Exception {
+if (!config.get(KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED)) {
+LOG.info(
+"Local artifact uploading is disabled. Set '{}' to 
enable.",
+KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED.key());
+return;
+}
+
+final String jobUri = upload(config, getJobUri(config));
+config.set(PipelineOptions.JARS, Collections.singletonList(jobUri));
+
+final List additionalUris =
+config.getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+.orElse(Collections.emptyList());
+
+final List uploadedAdditionalUris =
+additionalUris.stream()
+.map(
+FunctionUtils.uncheckedFunction(
+artifactUri -> upload(config, 
artifactUri)))
+.collect(Collectors.toList());
+
+config.set(ArtifactFetchOptions.ARTIFACT_LIST, uploadedAdditionalUris);
+}
+
+@VisibleForTesting
+String upload(Configuration config, String artifactUriStr)
+throws IOException, URISyntaxException {
+URI artifactUri = PackagedProgramUtils.resolveURI(artifactUriStr);
+if (!"local".equals(artifactUri.getScheme())) {
+return artifactUriStr;
+}
+
+final String targetDir = 
config.get(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
+checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(targetDir),
+String.format(
+"Setting '%s' to a valid remote path is required.",
+KubernetesConfigOptions.LOCAL_UPLOAD_TARGET.key()));
+
+final File src = new File(artifactUri.getPath());
+final Path target = new Path(targetDir, src.getName());
+if (target.getFileSystem().exists(target)) {
+LOG.debug("Skipping artifact '{}', as it already exists.", target);

Review Comment:
   For uploading, actually, it's executed as part of the CLI, no? Maybe it 
could be logged back to the user right on the spot i.e. "we did not send 
udf.jar because it was already there, use --owerwrite to reupload existing 
artifacts".
   
   > but I think the 2 things are different
   
   I agree, here is how I understand it: for fetching, since it's for 
Kubernetes, my assumption is that the artifacts are going to be stored on 
temporary storage. So most of the time artifacts will need to be downloaded 
again and that would 

Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-13 Thread via GitHub


ferenc-csaky commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1487526406


##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java:
##
@@ -86,7 +86,8 @@ public FlinkKubeClient fromConfiguration(
 
server.createClient().inNamespace(NAMESPACE),
 Executors.newDirectExecutorService());
 }
-});
+},
+config -> {});

Review Comment:
   It is a valid point IMO, during the impl I was thinking about wiring a 
`KubernetesArtifactUploader.NO_OP` into `KubernetesClusterDescriptor` according 
to the `local-upload-enabled` config option, but I thought it would be better 
to encapsulate all this logic into the uploader so 
`KubernetesClusterDescriptor` won't manage any upload related logic actively, 
hence I did not added that `NO_OP` impl, cause in tests there are a lot of 
other in place no-op usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-13 Thread via GitHub


ferenc-csaky commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1487357451


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploader.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Default {@link KubernetesArtifactUploader} implementation. */
+public class DefaultKubernetesArtifactUploader implements 
KubernetesArtifactUploader {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(DefaultKubernetesArtifactUploader.class);
+
+@Override
+public void uploadAll(Configuration config) throws Exception {
+if (!config.get(KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED)) {
+LOG.info(
+"Local artifact uploading is disabled. Set '{}' to 
enable.",
+KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED.key());
+return;
+}
+
+final String jobUri = upload(config, getJobUri(config));
+config.set(PipelineOptions.JARS, Collections.singletonList(jobUri));
+
+final List additionalUris =
+config.getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+.orElse(Collections.emptyList());
+
+final List uploadedAdditionalUris =
+additionalUris.stream()
+.map(
+FunctionUtils.uncheckedFunction(
+artifactUri -> upload(config, 
artifactUri)))
+.collect(Collectors.toList());
+
+config.set(ArtifactFetchOptions.ARTIFACT_LIST, uploadedAdditionalUris);
+}
+
+@VisibleForTesting
+String upload(Configuration config, String artifactUriStr)
+throws IOException, URISyntaxException {
+URI artifactUri = PackagedProgramUtils.resolveURI(artifactUriStr);
+if (!"local".equals(artifactUri.getScheme())) {
+return artifactUriStr;
+}
+
+final String targetDir = 
config.get(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
+checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(targetDir),
+String.format(
+"Setting '%s' to a valid remote path is required.",
+KubernetesConfigOptions.LOCAL_UPLOAD_TARGET.key()));
+
+final File src = new File(artifactUri.getPath());
+final Path target = new Path(targetDir, src.getName());
+if (target.getFileSystem().exists(target)) {
+LOG.debug("Skipping artifact '{}', as it already exists.", target);

Review Comment:
   It has to be removed by hand to get the updated JAR, or rename the new file. 
Regarding this I was thinking about adding a config option flag to force 
override existing artifacts on the remote.
   
   The same thing might make sense for fetching the artifats as well, but I 
think the 2 things are different so probably would require 2 different flags.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-13 Thread via GitHub


ferenc-csaky commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1487369239


##
flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptionsInternal.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+/** Artifact Fetch options. */
+@Internal
+public class ArtifactFetchOptionsInternal {
+
+public static final ConfigOption> COMPLETE_LIST =
+ConfigOptions.key("$internal.user.artifacts.complete-list")

Review Comment:
   That is a good point, following the same practice makes perfect sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-13 Thread via GitHub


ferenc-csaky commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1487369239


##
flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptionsInternal.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+/** Artifact Fetch options. */
+@Internal
+public class ArtifactFetchOptionsInternal {
+
+public static final ConfigOption> COMPLETE_LIST =
+ConfigOptions.key("$internal.user.artifacts.complete-list")

Review Comment:
   That is a good point, follow the same practice makes perfect sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-12 Thread via GitHub


ferenc-csaky commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1487357451


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploader.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Default {@link KubernetesArtifactUploader} implementation. */
+public class DefaultKubernetesArtifactUploader implements 
KubernetesArtifactUploader {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(DefaultKubernetesArtifactUploader.class);
+
+@Override
+public void uploadAll(Configuration config) throws Exception {
+if (!config.get(KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED)) {
+LOG.info(
+"Local artifact uploading is disabled. Set '{}' to 
enable.",
+KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED.key());
+return;
+}
+
+final String jobUri = upload(config, getJobUri(config));
+config.set(PipelineOptions.JARS, Collections.singletonList(jobUri));
+
+final List additionalUris =
+config.getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+.orElse(Collections.emptyList());
+
+final List uploadedAdditionalUris =
+additionalUris.stream()
+.map(
+FunctionUtils.uncheckedFunction(
+artifactUri -> upload(config, 
artifactUri)))
+.collect(Collectors.toList());
+
+config.set(ArtifactFetchOptions.ARTIFACT_LIST, uploadedAdditionalUris);
+}
+
+@VisibleForTesting
+String upload(Configuration config, String artifactUriStr)
+throws IOException, URISyntaxException {
+URI artifactUri = PackagedProgramUtils.resolveURI(artifactUriStr);
+if (!"local".equals(artifactUri.getScheme())) {
+return artifactUriStr;
+}
+
+final String targetDir = 
config.get(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
+checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(targetDir),
+String.format(
+"Setting '%s' to a valid remote path is required.",
+KubernetesConfigOptions.LOCAL_UPLOAD_TARGET.key()));
+
+final File src = new File(artifactUri.getPath());
+final Path target = new Path(targetDir, src.getName());
+if (target.getFileSystem().exists(target)) {
+LOG.debug("Skipping artifact '{}', as it already exists.", target);

Review Comment:
   It has to be removed by hand to get the updated JAR, or rename the new file. 
Regarding this I was thinking about adding a config option flag to force 
override existing artifacts on the remote.
   
   The same thing might make sense for fetching the artifats as well, but I 
think the 2 things are different so probably would require 2 different flags.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-12 Thread via GitHub


schevalley2 commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1486621705


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploader.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Default {@link KubernetesArtifactUploader} implementation. */
+public class DefaultKubernetesArtifactUploader implements 
KubernetesArtifactUploader {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(DefaultKubernetesArtifactUploader.class);
+
+@Override
+public void uploadAll(Configuration config) throws Exception {
+if (!config.get(KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED)) {
+LOG.info(
+"Local artifact uploading is disabled. Set '{}' to 
enable.",
+KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED.key());
+return;
+}
+
+final String jobUri = upload(config, getJobUri(config));
+config.set(PipelineOptions.JARS, Collections.singletonList(jobUri));
+
+final List additionalUris =
+config.getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+.orElse(Collections.emptyList());
+
+final List uploadedAdditionalUris =
+additionalUris.stream()
+.map(
+FunctionUtils.uncheckedFunction(
+artifactUri -> upload(config, 
artifactUri)))
+.collect(Collectors.toList());
+
+config.set(ArtifactFetchOptions.ARTIFACT_LIST, uploadedAdditionalUris);
+}
+
+@VisibleForTesting
+String upload(Configuration config, String artifactUriStr)
+throws IOException, URISyntaxException {
+URI artifactUri = PackagedProgramUtils.resolveURI(artifactUriStr);
+if (!"local".equals(artifactUri.getScheme())) {
+return artifactUriStr;
+}
+
+final String targetDir = 
config.get(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
+checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(targetDir),
+String.format(
+"Setting '%s' to a valid remote path is required.",
+KubernetesConfigOptions.LOCAL_UPLOAD_TARGET.key()));
+
+final File src = new File(artifactUri.getPath());
+final Path target = new Path(targetDir, src.getName());
+if (target.getFileSystem().exists(target)) {
+LOG.debug("Skipping artifact '{}', as it already exists.", target);

Review Comment:
   Actually, what happen if I am not adding version in the file name and I had 
an `udf.jar` in the target folder and now I have a new version of `udf.jar` 
that I want to upload?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-12 Thread via GitHub


schevalley2 commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1486546701


##
docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md:
##
@@ -97,13 +100,44 @@ COPY /path/of/my-flink-job.jar 
$FLINK_HOME/usrlib/my-flink-job.jar
 After creating and publishing the Docker image under `custom-image-name`, you 
can start an Application cluster with the following command:
 
 ```bash
-# Local Schema
 $ ./bin/flink run-application \
 --target kubernetes-application \
 -Dkubernetes.cluster-id=my-first-application-cluster \
 -Dkubernetes.container.image.ref=custom-image-name \
 local:///opt/flink/usrlib/my-flink-job.jar
+```
+
+ Configure User Artifact Management
+
+In case you have a locally available Flink job JAR, artifact upload can be 
utilized so Flink will upload the local artifact to DFS during deployment and 
fetch it on the deployed JobManager pod:

Review Comment:
   nit: utilized -> used



##
docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md:
##
@@ -97,13 +100,44 @@ COPY /path/of/my-flink-job.jar 
$FLINK_HOME/usrlib/my-flink-job.jar
 After creating and publishing the Docker image under `custom-image-name`, you 
can start an Application cluster with the following command:
 
 ```bash
-# Local Schema
 $ ./bin/flink run-application \
 --target kubernetes-application \
 -Dkubernetes.cluster-id=my-first-application-cluster \
 -Dkubernetes.container.image.ref=custom-image-name \
 local:///opt/flink/usrlib/my-flink-job.jar
+```
+
+ Configure User Artifact Management
+
+In case you have a locally available Flink job JAR, artifact upload can be 
utilized so Flink will upload the local artifact to DFS during deployment and 
fetch it on the deployed JobManager pod:
+
+```bash
+$ ./bin/flink run-application \
+--target kubernetes-application \
+-Dkubernetes.cluster-id=my-first-application-cluster \
+-Dkubernetes.container.image=custom-image-name \
+-Dkubernetes.artifacts.local-upload-enabled=true \
+-Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
+local:///tmp/my-flink-job.jar
+```
+
+The `kubernetes.artifacts.local-upload-enabled` enables this feature, and 
`kubernetes.artifacts.local-upload-target` has to point to a valid remote 
target that exists and the permissions configured properly.

Review Comment:
   nit: that exists and *has* the permissiosn configured properly.



##
flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptionsInternal.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+/** Artifact Fetch options. */
+@Internal
+public class ArtifactFetchOptionsInternal {
+
+public static final ConfigOption> COMPLETE_LIST =
+ConfigOptions.key("$internal.user.artifacts.complete-list")

Review Comment:
   I agree with you. I wonder if it would be worth either writing somewhere 
else as you suggest or simply log something like:
   
   > DefaultKubernetesArtifactUploader completed uploading artifacts, replacing 
user.artifacts.artifact-list with "…"
   
   So it can be easily found in logs, like in `checkAndUpdatePortConfigOption`



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java:
##
@@ -86,7 +86,8 @@ public FlinkKubeClient fromConfiguration(
 
server.createClient().inNamespace(NAMESPACE),
 Executors.newDirectExecutorService());
 }
-});
+},
+config -> {});

Review Comment:
   nit: I think it's fine since it's a test. Seeing in the diff I thoght maybe 
it's possible to declare some static NO_OP = config -> {} and use it here as 
`KubernetesArtifactUploader.NO_OP` but I don't have a strong opinion on that.



##

Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-12 Thread via GitHub


ferenc-csaky commented on code in PR #24303:
URL: https://github.com/apache/flink/pull/24303#discussion_r1486182159


##
flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptionsInternal.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+/** Artifact Fetch options. */
+@Internal
+public class ArtifactFetchOptionsInternal {
+
+public static final ConfigOption> COMPLETE_LIST =
+ConfigOptions.key("$internal.user.artifacts.complete-list")

Review Comment:
   This is currently unused. The current implementation modifies the passed 
`user.artifacts.artifact-list` config option, which was more straightforward 
regarding the impl but maybe confusing/magical for users, so it can be changed 
of the current behavior would be a no go. Otherwise, this can be removed.
   
   Personally I do not think modifying the given config option would be risky, 
IMO it can be justified to show the final deployed state in the config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-12 Thread via GitHub


flinkbot commented on PR #24303:
URL: https://github.com/apache/flink/pull/24303#issuecomment-1938670866

   
   ## CI report:
   
   * 3b142b6ba387fd7f6bfaee5c66d9d9d8b0966976 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-02-12 Thread via GitHub


ferenc-csaky opened a new pull request, #24303:
URL: https://github.com/apache/flink/pull/24303

   ## What is the purpose of the change
   
   Makes it possible to upload `local://` artifacts in K8s Application Mode to 
remote DFS, so combined with `Artifact Fetching`, it is not necessary to bundle 
all Flink job deps to the Docker image.
   
   Upload handles the job JAR and any additional local artifact that is present 
in the `user.artifacts.artifact-list` config. The 
`user.artifacts.artifact-list` can contain both local and remote artifact 
mixed, former will be uploaded, latter will remain unchanged and everything 
will be fetched on the JM pod.
   
   ### Example
   
   ```bash
   # Without additional artifacts.
   $ ./bin/flink run-application \
   --target kubernetes-application \
   -Dkubernetes.cluster-id=my-first-application-cluster \
   -Dkubernetes.container.image=custom-image-name \
   -Dkubernetes.artifacts.local-upload-enabled=true \
   -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
   local:///tmp/my-flink-job.jar
   
   # With additional artifacts.
   $ ./bin/flink run-application \
   --target kubernetes-application \
   -Dkubernetes.cluster-id=my-first-application-cluster \
   -Dkubernetes.container.image=custom-image-name \
   -Dkubernetes.artifacts.local-upload-enabled=true \
   -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
   
-Duser.artifacts.artifact-list=local:///tmp/my-flink-udf1.jar\;s3://my-bucket/my-flink-udf2.jar
 \
   local:///tmp/my-flink-job.jar
   ```
   
   ## Brief change log
   
   - Added new `KubernetesConfigOptions` for uploading.
   - Introduced `KubernetesArtifactUploader` and a default impl, which handles 
the uploading.
   - Wired in the uploading to `KubernetesClusterDescriptor`.
   
   ## Verifying this change
   
   - Added unit tests in `DefaultKubernetesArtifactUploaderTest`.
   - Verified the change manually with Minikube and Minio.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org