This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 81c44b4 [BEAM-7967] Execute portable Flink application jar new c4674b6 Merge pull request #9408 from ibzib/flink-execute-jar 81c44b4 is described below commit 81c44b446d40eff6812f45ed7c4e78e845f2eee2 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Tue Aug 27 13:39:02 2019 -0700 [BEAM-7967] Execute portable Flink application jar --- .../job_PostCommit_PortableJar_Flink.groovy | 38 ++++++ runners/flink/job-server/flink_job_server.gradle | 17 +++ runners/flink/job-server/test_pipeline_jar.sh | 121 +++++++++++++++++++ .../beam/runners/flink/FlinkPipelineRunner.java | 74 ++++++++++++ .../jobsubmission/PortablePipelineJarUtils.java | 130 +++++++++++++++++++++ 5 files changed, 380 insertions(+) diff --git a/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy b/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy new file mode 100644 index 0000000..a2bc53e --- /dev/null +++ b/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import PostcommitJobBuilder + +// Tests creation and execution of portable pipeline Jars on the Flink runner. +PostcommitJobBuilder.postCommitJob('beam_PostCommit_PortableJar_Flink', + 'Run PortableJar_Flink PostCommit', 'Flink Portable Jar Tests', this) { + description('Tests creation and execution of portable pipeline Jars on the Flink runner.') + + // Set common parameters. + commonJobProperties.setTopLevelMainJobProperties(delegate) + + // Gradle goals for this job. + steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(':runners:flink:1.8:job-server:testPipelineJar') + commonJobProperties.setGradleSwitches(delegate) + } + } +} diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 0f555fb..3789007 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -177,3 +177,20 @@ project.ext.validatesCrossLanguageRunner = createCrossLanguageValidatesRunnerTas "--shutdownSourcesOnFinalWatermark", ] ) + +task testPipelineJar() { + dependsOn shadowJar + dependsOn ":sdks:python:container:py35:docker" + doLast{ + exec { + executable "sh" + def options = [ + "--flink_job_server_jar ${shadowJar.archivePath}", + "--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}", + "--python_root_dir ${project.rootDir}/sdks/python", + "--python_version 3.5" + ] + args "-c", "../../job-server/test_pipeline_jar.sh ${options.join(' ')}" + } + } +} diff --git a/runners/flink/job-server/test_pipeline_jar.sh b/runners/flink/job-server/test_pipeline_jar.sh new file mode 100755 index 0000000..c59facf --- /dev/null +++ b/runners/flink/job-server/test_pipeline_jar.sh @@ -0,0 +1,121 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -e +set -v + +while [[ $# -gt 0 ]] +do +key="$1" +case $key in + --flink_job_server_jar) + FLINK_JOB_SERVER_JAR="$2" + shift # past argument + shift # past value + ;; + --env_dir) + ENV_DIR="$2" + shift # past argument + shift # past value + ;; + --python_root_dir) + PYTHON_ROOT_DIR="$2" + shift # past argument + shift # past value + ;; + --python_version) + PYTHON_VERSION="$2" + shift # past argument + shift # past value + ;; + *) # unknown option + echo "Unknown option: $1" + exit 1 + ;; +esac +done + +# Go to the root of the repository +cd $(git rev-parse --show-toplevel) + +# Verify docker command exists +command -v docker +docker -v + +CONTAINER=$USER-docker-apache.bintray.io/beam/python$PYTHON_VERSION +TAG=latest +# Verify container has already been built +docker images $CONTAINER:$TAG | grep $TAG + +# Set up Python environment +virtualenv -p python$PYTHON_VERSION $ENV_DIR +. $ENV_DIR/bin/activate +pip install --retries 10 -e $PYTHON_ROOT_DIR + +PIPELINE_PY=" +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms import Create +from apache_beam.transforms import Map + +# To test that our main session is getting plumbed through artifact staging +# correctly, create a global variable. If the main session is not plumbed +# through properly, global_var will be undefined and the pipeline will fail. +global_var = 1 + +pipeline_options = PipelineOptions() +pipeline_options.view_as(SetupOptions).save_main_session = True +pipeline = beam.Pipeline(options=pipeline_options) +pcoll = (pipeline + | Create([0, 1, 2]) + | Map(lambda x: x + global_var)) +assert_that(pcoll, equal_to([1, 2, 3])) + +result = pipeline.run() +result.wait_until_finish() +" + +# Create the jar +OUTPUT_JAR=flink-test-$(date +%Y%m%d-%H%M%S).jar +(python -c "$PIPELINE_PY" \ + --runner FlinkRunner \ + --flink_job_server_jar $FLINK_JOB_SERVER_JAR \ + --output_executable_path $OUTPUT_JAR \ + --parallelism 1 \ + --sdk_worker_parallelism 1 \ + --environment_type DOCKER \ + --environment_config=$CONTAINER:$TAG \ +) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting + +if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then + # Execute the jar + java -jar $OUTPUT_JAR || TEST_EXIT_CODE=$? +fi + +rm -rf $ENV_DIR +rm -f $OUTPUT_JAR + +if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then + echo ">>> SUCCESS" +else + echo ">>> FAILURE" +fi +exit $TEST_EXIT_CODE diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index e61cb12..f33af5c0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -17,25 +17,37 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections; +import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.UUID; import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; import org.apache.beam.runners.core.construction.graph.PipelineTrimmer; import org.apache.beam.runners.core.metrics.MetricsPusher; +import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils; import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult; import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.metrics.MetricsOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.client.program.DetachedEnvironment; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,4 +132,66 @@ public class FlinkPipelineRunner implements PortablePipelineRunner { return flinkRunnerResult; } } + + /** + * Main method to be called only as the entry point to an executable jar with structure as defined + * in {@link PortablePipelineJarUtils}. + */ + public static void main(String[] args) throws Exception { + // Register standard file systems. + FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); + + FlinkPipelineRunnerConfiguration configuration = parseArgs(args); + Pipeline pipeline = PortablePipelineJarUtils.getPipelineFromClasspath(); + Struct options = PortablePipelineJarUtils.getPipelineOptionsFromClasspath(); + FlinkPipelineOptions flinkOptions = + PipelineOptionsTranslation.fromProto(options).as(FlinkPipelineOptions.class); + String invocationId = + String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString()); + ProxyManifest proxyManifest = PortablePipelineJarUtils.getArtifactManifestFromClassPath(); + String retrievalToken = + PortablePipelineJarUtils.stageArtifacts( + proxyManifest, flinkOptions, invocationId, configuration.artifactStagingPath); + + FlinkPipelineRunner runner = + new FlinkPipelineRunner( + flinkOptions, + configuration.flinkConfDir, + detectClassPathResourcesToStage(FlinkPipelineRunner.class.getClassLoader())); + JobInfo jobInfo = + JobInfo.create(invocationId, flinkOptions.getJobName(), retrievalToken, options); + try { + runner.run(pipeline, jobInfo); + } catch (Exception e) { + throw new RuntimeException(String.format("Job %s failed.", invocationId), e); + } + LOG.info("Job {} finished successfully.", invocationId); + } + + private static class FlinkPipelineRunnerConfiguration { + @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files") + private String artifactStagingPath = + Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString(); + + @Option( + name = "--flink-conf-dir", + usage = + "Directory containing Flink YAML configuration files. " + + "These properties will be set to all jobs submitted to Flink and take precedence " + + "over configurations in FLINK_CONF_DIR.") + private String flinkConfDir = null; + } + + private static FlinkPipelineRunnerConfiguration parseArgs(String[] args) { + FlinkPipelineRunnerConfiguration configuration = new FlinkPipelineRunnerConfiguration(); + CmdLineParser parser = new CmdLineParser(configuration); + try { + parser.parseArgument(args); + } catch (CmdLineException e) { + LOG.error("Unable to parse command line arguments.", e); + parser.printUsage(System.err); + throw new IllegalArgumentException("Unable to parse command line arguments.", e); + } + return configuration; + } } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java index af712ad..5045f3b 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java @@ -17,6 +17,38 @@ */ package org.apache.beam.runners.fnexecution.jobsubmission; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.core.construction.ArtifactServiceStager; +import org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.InProcessServerFactory; +import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Message.Builder; +import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct; +import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.util.JsonFormat; +import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; +import org.apache.commons.compress.utils.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Contains common code for writing and reading portable pipeline jars. * @@ -51,4 +83,102 @@ public abstract class PortablePipelineJarUtils { ARTIFACT_STAGING_FOLDER_PATH + "/artifact-manifest.json"; static final String PIPELINE_PATH = PIPELINE_FOLDER_PATH + "/pipeline.json"; static final String PIPELINE_OPTIONS_PATH = PIPELINE_FOLDER_PATH + "/pipeline-options.json"; + + private static final Logger LOG = LoggerFactory.getLogger(PortablePipelineJarCreator.class); + + private static InputStream getResourceFromClassPath(String resourcePath) throws IOException { + InputStream inputStream = PortablePipelineJarUtils.class.getResourceAsStream(resourcePath); + if (inputStream == null) { + throw new FileNotFoundException( + String.format("Resource %s not found on classpath.", resourcePath)); + } + return inputStream; + } + + /** Populates {@code builder} using the JSON resource specified by {@code resourcePath}. */ + private static void parseJsonResource(String resourcePath, Builder builder) throws IOException { + try (InputStream inputStream = getResourceFromClassPath(resourcePath)) { + String contents = new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8); + JsonFormat.parser().merge(contents, builder); + } + } + + public static Pipeline getPipelineFromClasspath() throws IOException { + Pipeline.Builder builder = Pipeline.newBuilder(); + parseJsonResource("/" + PIPELINE_PATH, builder); + return builder.build(); + } + + public static Struct getPipelineOptionsFromClasspath() throws IOException { + Struct.Builder builder = Struct.newBuilder(); + parseJsonResource("/" + PIPELINE_OPTIONS_PATH, builder); + return builder.build(); + } + + public static ProxyManifest getArtifactManifestFromClassPath() throws IOException { + ProxyManifest.Builder builder = ProxyManifest.newBuilder(); + parseJsonResource("/" + ARTIFACT_MANIFEST_PATH, builder); + return builder.build(); + } + + /** Writes artifacts listed in {@code proxyManifest}. */ + public static String stageArtifacts( + ProxyManifest proxyManifest, + PipelineOptions options, + String invocationId, + String artifactStagingPath) + throws Exception { + Collection<StagedFile> filesToStage = + prepareArtifactsForStaging(proxyManifest, options, invocationId); + try (GrpcFnServer artifactServer = + GrpcFnServer.allocatePortAndCreateFor( + new BeamFileSystemArtifactStagingService(), InProcessServerFactory.create())) { + ManagedChannel grpcChannel = + InProcessManagedChannelFactory.create() + .forDescriptor(artifactServer.getApiServiceDescriptor()); + ArtifactServiceStager stager = ArtifactServiceStager.overChannel(grpcChannel); + String stagingSessionToken = + BeamFileSystemArtifactStagingService.generateStagingSessionToken( + invocationId, artifactStagingPath); + String retrievalToken = stager.stage(stagingSessionToken, filesToStage); + // Clean up. + for (StagedFile file : filesToStage) { + if (!file.getFile().delete()) { + LOG.warn("Failed to delete file {}", file.getFile()); + } + } + grpcChannel.shutdown(); + return retrievalToken; + } + } + + /** + * Artifacts are expected to exist as resources on the classpath, located using {@code + * proxyManifest}. Write them to tmp files so they can be staged. + */ + private static Collection<StagedFile> prepareArtifactsForStaging( + ProxyManifest proxyManifest, PipelineOptions options, String invocationId) + throws IOException { + List<StagedFile> filesToStage = new ArrayList<>(); + Path outputFolderPath = + Paths.get( + MoreObjects.firstNonNull( + options.getTempLocation(), System.getProperty("java.io.tmpdir")), + invocationId); + if (!outputFolderPath.toFile().mkdir()) { + throw new IOException("Failed to create folder " + outputFolderPath); + } + for (Location location : proxyManifest.getLocationList()) { + try (InputStream inputStream = getResourceFromClassPath(location.getUri())) { + Path outputPath = outputFolderPath.resolve(UUID.randomUUID().toString()); + LOG.trace("Writing artifact {} to file {}", location.getName(), outputPath); + File file = outputPath.toFile(); + try (FileOutputStream outputStream = new FileOutputStream(file)) { + IOUtils.copy(inputStream, outputStream); + filesToStage.add(StagedFile.of(file, location.getName())); + } + } + } + return filesToStage; + } }