This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 36cc7b4 [BEAM-12875] Register file systems in SparkExecutableStageFunction new 38305d9 Merge pull request #15502 from meowcakes/register_filesystems_for_artifact_retrieval_service 36cc7b4 is described below commit 36cc7b42bd77eb1a87b3a469e5aeb58e534524db Author: Rogan Morrow <rogan.o.mor...@gmail.com> AuthorDate: Wed Oct 6 16:18:04 2021 +0800 [BEAM-12875] Register file systems in SparkExecutableStageFunction --- .../translation/SparkBatchPortablePipelineTranslator.java | 2 ++ .../spark/translation/SparkExecutableStageFunction.java | 11 +++++++++++ .../translation/SparkStreamingPortablePipelineTranslator.java | 1 + .../spark/translation/SparkExecutableStageFunctionTest.java | 5 +++++ 4 files changed, 19 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java index f0d666c..62b39d7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java @@ -250,6 +250,7 @@ public class SparkBatchPortablePipelineTranslator groupByKeyPair(inputDataset, keyCoder, wvCoder); SparkExecutableStageFunction<KV, SideInputT> function = new SparkExecutableStageFunction<>( + context.getSerializableOptions(), stagePayload, context.jobInfo, outputExtractionMap, @@ -262,6 +263,7 @@ public class SparkBatchPortablePipelineTranslator JavaRDD<WindowedValue<InputT>> inputRdd2 = ((BoundedDataset<InputT>) inputDataset).getRDD(); SparkExecutableStageFunction<InputT, SideInputT> function2 = new SparkExecutableStageFunction<>( + context.getSerializableOptions(), stagePayload, context.jobInfo, outputExtractionMap, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java index 24296bf..a78b082 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java @@ -34,6 +34,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.Timer; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; @@ -51,11 +52,13 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.runners.fnexecution.state.StateRequestHandlers; import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory; import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.runners.spark.util.ByteArray; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -86,6 +89,8 @@ class SparkExecutableStageFunction<InputT, SideInputT> private static final Logger LOG = LoggerFactory.getLogger(SparkExecutableStageFunction.class); + // Pipeline options for initializing the FileSystems + private final SerializablePipelineOptions pipelineOptions; private final RunnerApi.ExecutableStagePayload stagePayload; private final Map<String, Integer> outputMap; private final SparkExecutableStageContextFactory contextFactory; @@ -100,6 +105,7 @@ class SparkExecutableStageFunction<InputT, SideInputT> private transient Object currentTimerKey; SparkExecutableStageFunction( + SerializablePipelineOptions pipelineOptions, RunnerApi.ExecutableStagePayload stagePayload, JobInfo jobInfo, Map<String, Integer> outputMap, @@ -107,6 +113,7 @@ class SparkExecutableStageFunction<InputT, SideInputT> Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValueCoder<SideInputT>>> sideInputs, MetricsContainerStepMapAccumulator metricsAccumulator, Coder windowCoder) { + this.pipelineOptions = pipelineOptions; this.stagePayload = stagePayload; this.jobInfo = jobInfo; this.outputMap = outputMap; @@ -123,6 +130,10 @@ class SparkExecutableStageFunction<InputT, SideInputT> @Override public Iterator<RawUnionValue> call(Iterator<WindowedValue<InputT>> inputs) throws Exception { + SparkPipelineOptions options = pipelineOptions.get().as(SparkPipelineOptions.class); + // Register standard file systems. + FileSystems.setDefaultPipelineOptions(options); + // Do not call processElements if there are no inputs // Otherwise, this may cause validation errors (e.g. ParDoTest) if (!inputs.hasNext()) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java index d993459..7652c89 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java @@ -252,6 +252,7 @@ public class SparkStreamingPortablePipelineTranslator SparkExecutableStageFunction<InputT, SideInputT> function = new SparkExecutableStageFunction<>( + context.getSerializableOptions(), stagePayload, context.jobInfo, outputMap, diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java index 24d69a5..2e74eb0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java @@ -37,6 +37,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.Timer; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; @@ -53,6 +54,7 @@ import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -76,6 +78,8 @@ public class SparkExecutableStageFunctionTest { @Mock private MetricsContainerStepMap stepMap; @Mock private MetricsContainerImpl container; + private final SerializablePipelineOptions pipelineOptions = + new SerializablePipelineOptions(PipelineOptionsFactory.create()); private final String inputId = "input-id"; private final ExecutableStagePayload stagePayload = ExecutableStagePayload.newBuilder() @@ -258,6 +262,7 @@ public class SparkExecutableStageFunctionTest { private <InputT, SideInputT> SparkExecutableStageFunction<InputT, SideInputT> getFunction( Map<String, Integer> outputMap) { return new SparkExecutableStageFunction<>( + pipelineOptions, stagePayload, null, outputMap,