mr-runner: support PCollections materialization with multiple MR jobs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40396d75 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40396d75 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40396d75 Branch: refs/heads/mr-runner Commit: 40396d758ad21e4938d395007583bf7c61ebdd97 Parents: 5905efd Author: Pei He <p...@apache.org> Authored: Tue Aug 8 11:30:29 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:49 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/MapReducePipelineOptions.java | 4 + .../beam/runners/mapreduce/MapReduceRunner.java | 15 +- .../mapreduce/translation/BeamInputFormat.java | 85 +++++++--- .../mapreduce/translation/BeamMapper.java | 8 +- .../mapreduce/translation/BeamReducer.java | 20 +-- .../mapreduce/translation/DotfileWriter.java | 53 +++++- .../translation/FileReadOperation.java | 165 +++++++++++++++++++ .../translation/FileWriteOperation.java | 77 +++++++++ .../translation/FlattenTranslator.java | 7 +- .../runners/mapreduce/translation/Graph.java | 79 +++++---- .../mapreduce/translation/GraphPlanner.java | 67 +++++--- .../runners/mapreduce/translation/Graphs.java | 106 +++++++++--- .../GroupAlsoByWindowsParDoOperation.java | 5 +- .../translation/GroupByKeyTranslator.java | 7 +- .../mapreduce/translation/JobPrototype.java | 93 +++++++---- .../mapreduce/translation/Operation.java | 11 +- .../mapreduce/translation/OutputReceiver.java | 9 +- .../mapreduce/translation/ParDoOperation.java | 43 +++-- .../mapreduce/translation/ParDoTranslator.java | 8 +- .../translation/ReadBoundedTranslator.java | 11 +- .../mapreduce/translation/ReadOperation.java | 45 ----- .../ReifyTimestampAndWindowsParDoOperation.java | 5 +- .../translation/ShuffleWriteOperation.java | 62 +++++++ .../mapreduce/translation/SourceOperation.java | 45 +++++ .../translation/TranslationContext.java | 4 +- .../translation/TranslatorRegistry.java | 11 +- .../mapreduce/translation/ViewTranslator.java | 8 +- .../translation/WindowAssignTranslator.java | 7 +- .../mapreduce/translation/WriteOperation.java | 66 -------- .../mapreduce/translation/GraphPlannerTest.java | 3 +- 30 files changed, 801 insertions(+), 328 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java index c37da58..9224eb6 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java @@ -43,6 +43,10 @@ public interface MapReducePipelineOptions extends PipelineOptions { Class<?> getJarClass(); void setJarClass(Class<?> jarClass); + @Description("The jar class of the user Beam program.") + String getTmpDir(); + void setTmpDir(String tmpDir); + class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> { @Override public Class<?> create(PipelineOptions options) { http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index c5626a4..a7e75bb 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -66,19 +66,30 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult> { LOG.info(graphConverter.getDotfile()); + Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(context.getInitGraph()); + LOG.info(DotfileWriter.toDotfile(fusedGraph)); + GraphPlanner planner = new GraphPlanner(); - Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph()); + fusedGraph = planner.plan(fusedGraph); LOG.info(DotfileWriter.toDotfile(fusedGraph)); + Configuration config = new Configuration(); + config.set("keep.failed.task.files", "true"); + + fusedGraph.getFusedSteps(); + int stageId = 0; for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, options); + LOG.info("Running job-{}.", stageId); + LOG.info(DotfileWriter.toDotfile(fusedStep)); try { - Job job = jobPrototype.build(options.getJarClass(), new Configuration()); + Job job = jobPrototype.build(options.getJarClass(), config); job.waitForCompletion(true); } catch (Exception e) { Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } return null; http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java index 8a27a85..03a88aa 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -21,14 +21,17 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Function; import com.google.common.base.Strings; +import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.commons.codec.binary.Base64; @@ -45,10 +48,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; public class BeamInputFormat<T> extends InputFormat { public static final String BEAM_SERIALIZED_BOUNDED_SOURCE = "beam-serialized-bounded-source"; + public static final String BEAM_SERIALIZED_PIPELINE_OPTIONS = "beam-serialized-pipeline-options"; + private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000; - private BoundedSource<T> source; - private PipelineOptions options; + private List<BoundedSource<T>> sources; + private SerializedPipelineOptions options; public BeamInputFormat() { } @@ -56,21 +61,36 @@ public class BeamInputFormat<T> extends InputFormat { @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { String serializedBoundedSource = context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE); - if (Strings.isNullOrEmpty(serializedBoundedSource)) { + String serializedPipelineOptions = + context.getConfiguration().get(BEAM_SERIALIZED_PIPELINE_OPTIONS); + if (Strings.isNullOrEmpty(serializedBoundedSource) + || Strings.isNullOrEmpty(serializedPipelineOptions)) { return ImmutableList.of(); } - source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray( + sources = (List<BoundedSource<T>>) SerializableUtils.deserializeFromByteArray( Base64.decodeBase64(serializedBoundedSource), "BoundedSource"); + options = ((SerializedPipelineOptions) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(serializedPipelineOptions), "SerializedPipelineOptions")); + try { - return FluentIterable.from(source.split(DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options)) - .transform(new Function<BoundedSource<T>, InputSplit>() { + + return FluentIterable.from(sources) + .transformAndConcat(new Function<BoundedSource<T>, Iterable<BoundedSource<T>>>() { @Override - public InputSplit apply(BoundedSource<T> source) { + public Iterable<BoundedSource<T>> apply(BoundedSource<T> input) { try { - return new BeamInputSplit(source.getEstimatedSizeBytes(options)); + return (Iterable<BoundedSource<T>>) input.split( + DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions()); } catch (Exception e) { + Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } + } + }) + .transform(new Function<BoundedSource<T>, InputSplit>() { + @Override + public InputSplit apply(BoundedSource<T> source) { + return new BeamInputSplit(source, options); }}) .toList(); } catch (Exception e) { @@ -81,26 +101,35 @@ public class BeamInputFormat<T> extends InputFormat { @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - // TODO: it should initiates from InputSplit. - source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray( - Base64.decodeBase64(context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE)), - ""); - return new BeamRecordReader<>(source.createReader(options)); + return ((BeamInputSplit) split).createReader(); } - public static class BeamInputSplit extends InputSplit implements Writable { - private long estimatedSizeBytes; + public static class BeamInputSplit<T> extends InputSplit implements Writable { + private BoundedSource<T> boundedSource; + private SerializedPipelineOptions options; public BeamInputSplit() { } - BeamInputSplit(long estimatedSizeBytes) { - this.estimatedSizeBytes = estimatedSizeBytes; + public BeamInputSplit(BoundedSource<T> boundedSource, SerializedPipelineOptions options) { + this.boundedSource = checkNotNull(boundedSource, "boundedSources"); + this.options = checkNotNull(options, "options"); + } + + public BeamRecordReader<T> createReader() throws IOException { + return new BeamRecordReader<>(boundedSource.createReader(options.getPipelineOptions())); } @Override public long getLength() throws IOException, InterruptedException { - return estimatedSizeBytes; + try { + return boundedSource.getEstimatedSizeBytes(options.getPipelineOptions()); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + Throwables.throwIfInstanceOf(e, IOException.class); + Throwables.throwIfInstanceOf(e, InterruptedException.class); + throw new RuntimeException(e); + } } @Override @@ -110,16 +139,28 @@ public class BeamInputFormat<T> extends InputFormat { @Override public void write(DataOutput out) throws IOException { - out.writeLong(estimatedSizeBytes); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream); + SerializableCoder.of(SerializedPipelineOptions.class).encode(options, stream); + + byte[] bytes = stream.toByteArray(); + out.writeInt(bytes.length); + out.write(bytes); } @Override public void readFields(DataInput in) throws IOException { - estimatedSizeBytes = in.readLong(); + int length = in.readInt(); + byte[] bytes = new byte[length]; + in.readFully(bytes); + + ByteArrayInputStream inStream = new ByteArrayInputStream(bytes); + boundedSource = SerializableCoder.of(BoundedSource.class).decode(inStream); + options = SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream); } } - private class BeamRecordReader<T> extends RecordReader { + private static class BeamRecordReader<T> extends RecordReader { private final BoundedSource.BoundedReader<T> reader; private boolean started; http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java index bc52967..d3ebb5c 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java @@ -19,17 +19,21 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; +import java.io.IOException; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Adapter for executing Beam transforms in {@link Mapper}. */ public class BeamMapper<ValueInT, ValueOutT> extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>> { + private static final Logger LOG = LoggerFactory.getLogger(Mapper.class); public static final String BEAM_PAR_DO_OPERATION_MAPPER = "beam-par-do-op-mapper"; @@ -50,7 +54,9 @@ public class BeamMapper<ValueInT, ValueOutT> protected void map( Object key, WindowedValue<ValueInT> value, - Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { + Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) + throws IOException, InterruptedException { + LOG.info("key: {} value: {}.", key, value); operation.process(value); } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java index 3490b3b..a382904 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java @@ -33,8 +33,11 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -42,6 +45,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; */ public class BeamReducer<ValueInT, ValueOutT> extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>> { + private static final Logger LOG = LoggerFactory.getLogger(Reducer.class); public static final String BEAM_REDUCER_KV_CODER = "beam-reducer-kv-coder"; public static final String BEAM_PAR_DO_OPERATION_REDUCER = "beam-par-do-op-reducer"; @@ -72,7 +76,8 @@ public class BeamReducer<ValueInT, ValueOutT> protected void reduce( BytesWritable key, Iterable<byte[]> values, - Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context) { + Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context) + throws InterruptedException, IOException { List<Object> decodedValues = Lists.newArrayList(FluentIterable.from(values) .transform(new Function<byte[], Object>() { @Override @@ -85,15 +90,10 @@ public class BeamReducer<ValueInT, ValueOutT> throw new RuntimeException(e); } }})); - - try { - operation.process( - WindowedValue.valueInGlobalWindow( - KV.of(keyCoder.decode(new ByteArrayInputStream(key.getBytes())), decodedValues))); - } catch (IOException e) { - Throwables.throwIfUnchecked(e); - throw new RuntimeException(e); - } + Object decodedKey = keyCoder.decode(new ByteArrayInputStream(key.getBytes())); + LOG.info("key: {} value: {}.", decodedKey, decodedValues); + operation.process( + WindowedValue.valueInGlobalWindow(KV.of(decodedKey, decodedValues))); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java index 5b0fcd8..863c4c9 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java @@ -17,36 +17,75 @@ */ package org.apache.beam.runners.mapreduce.translation; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.util.Map; +import java.util.Set; + /** * Class that outputs {@link Graph} to dot file. */ public class DotfileWriter { - public static <StepT extends Graph.AbstractStep<TagT>, TagT extends Graph.AbstractTag> + public static <StepT extends Graph.AbstractStep, TagT extends Graph.AbstractTag> String toDotfile(Graphs.FusedGraph fusedGraph) { StringBuilder sb = new StringBuilder(); sb.append("\ndigraph G {\n"); + Map<Graphs.FusedStep, String> fusedStepToId = Maps.newHashMap(); int i = 0; for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { - sb.append(String.format(" subgraph \"cluster_%d\" {\n", i++)); + String clusterId = String.format("cluster_%d", i++); + sb.append(String.format(" subgraph \"%s\" {\n", clusterId)); + sb.append(String.format(" \"%s\" [shape=point style=invis];\n", clusterId)); + fusedStepToId.put(fusedStep, clusterId); + + Set<String> nodeDefines = Sets.newHashSet(); for (Graphs.Step step : fusedStep.getSteps()) { - sb.append(String.format(" \"%s\" [shape=box];\n", step.getFullName())); - for (Graph.AbstractTag outTag : step.getOutputTags()) { - sb.append(String.format(" \"%s\" [shape=ellipse];\n", outTag)); + nodeDefines.add(String.format(" \"%s\" [shape=box];\n", step.getFullName())); + for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) { + nodeDefines.add(String.format(" \"%s\" [shape=ellipse];\n", inTag)); } + for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) { + nodeDefines.add(String.format(" \"%s\" [shape=ellipse];\n", outTag)); + } + } + for (String str : nodeDefines) { + sb.append(str); } sb.append(String.format(" }")); } for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { + // Edges within fused steps. for (Graphs.Step step : fusedStep.getSteps()) { - for (Graph.AbstractTag inTag : step.getInputTags()) { + for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) { sb.append(String.format(" \"%s\" -> \"%s\";\n", inTag, step)); } - for (Graph.AbstractTag outTag : step.getOutputTags()) { + for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) { sb.append(String.format(" \"%s\" -> \"%s\";\n", step, outTag)); } } + + // Edges between sub-graphs. + for (Graphs.Tag inTag : fusedGraph.getInputTags(fusedStep)) { + sb.append(String.format(" \"%s\" -> \"%s\";\n", inTag, fusedStepToId.get(fusedStep))); + } + } + sb.append("}\n"); + return sb.toString(); + } + + public static String toDotfile(Graphs.FusedStep fusedStep) { + StringBuilder sb = new StringBuilder(); + sb.append("\ndigraph G {\n"); + for (Graphs.Step step : fusedStep.getSteps()) { + sb.append(String.format(" \"%s\" [shape=box];\n", step.getFullName())); + for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) { + sb.append(String.format(" \"%s\" -> \"%s\";\n", inTag, step)); + } + for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) { + sb.append(String.format(" \"%s\" -> \"%s\";\n", step, outTag)); + } } sb.append("}\n"); return sb.toString(); http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java new file mode 100644 index 0000000..674e30a --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; + +/** + * Operation that reads from files. + */ +public class FileReadOperation<T> extends SourceOperation<WindowedValue<T>> { + + public FileReadOperation(int producerStageId, String fileName, Coder<T> coder) { + super(new FileBoundedSource<>(producerStageId, fileName, coder)); + } + + private static class FileBoundedSource<T> extends BoundedSource<WindowedValue<T>> { + + private final int producerStageId; + private final String fileName; + private final Coder<WindowedValue<T>> coder; + + FileBoundedSource(int producerStageId, String fileName, Coder<T> coder) { + this.producerStageId = producerStageId; + this.fileName = checkNotNull(fileName, "fileName"); + checkNotNull(coder, "coder"); + this.coder = WindowedValue.getFullCoder( + coder, WindowingStrategy.globalDefault().getWindowFn().windowCoder()); + + } + + @Override + public List<? extends BoundedSource<WindowedValue<T>>> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + // TODO: support split. + return ImmutableList.of(this); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return 0; + } + + @Override + public BoundedReader<WindowedValue<T>> createReader(PipelineOptions options) + throws IOException { + Path pattern = new Path(String.format("/tmp/mapreduce/stage-2/%s*", fileName)); + // TODO: use config from the job. + Configuration conf = new Configuration(); + conf.set( + "io.serializations", + "org.apache.hadoop.io.serializer.WritableSerialization," + + "org.apache.hadoop.io.serializer.JavaSerialization"); + FileSystem fs = pattern.getFileSystem(conf); + FileStatus[] files = fs.globStatus(pattern); + Queue<SequenceFile.Reader> readers = new LinkedList<>(); + for (FileStatus f : files) { + readers.add(new SequenceFile.Reader(fs, files[0].getPath(), conf)); + } + return new Reader<>(this, readers, coder); + } + + @Override + public void validate() { + } + + @Override + public Coder<WindowedValue<T>> getDefaultOutputCoder() { + return coder; + } + + private static class Reader<T> extends BoundedReader<WindowedValue<T>> { + + private final BoundedSource<WindowedValue<T>> boundedSource; + private final Queue<SequenceFile.Reader> readers; + private final Coder<WindowedValue<T>> coder; + private final BytesWritable value = new BytesWritable(); + + Reader( + BoundedSource<WindowedValue<T>> boundedSource, + Queue<SequenceFile.Reader> readers, + Coder<WindowedValue<T>> coder) { + this.boundedSource = checkNotNull(boundedSource, "boundedSource"); + this.readers = checkNotNull(readers, "readers"); + this.coder = checkNotNull(coder, "coder"); + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + SequenceFile.Reader reader = readers.peek(); + if (reader == null) { + return false; + } + boolean hasNext = reader.next(NullWritable.get(), value); + if (hasNext) { + return true; + } else { + reader.close(); + readers.remove(reader); + return advance(); + } + } + + @Override + public WindowedValue<T> getCurrent() throws NoSuchElementException { + ByteArrayInputStream inStream = new ByteArrayInputStream(value.getBytes()); + try { + return coder.decode(inStream); + } catch (IOException e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + } + + @Override + public BoundedSource<WindowedValue<T>> getCurrentSource() { + return boundedSource; + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java new file mode 100644 index 0000000..468856a --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; + +/** + * Operation that writes to files. + */ +public class FileWriteOperation<T> extends Operation<T> { + + private final String fileName; + private final Coder<WindowedValue<T>> coder; + private transient MultipleOutputs mos; + + public FileWriteOperation(String fileName, Coder<T> coder) { + super(0); + this.fileName = checkNotNull(fileName, "fileName"); + checkNotNull(coder, "coder"); + // TODO: should not hard-code windows coder. + this.coder = WindowedValue.getFullCoder( + coder, WindowingStrategy.globalDefault().getWindowFn().windowCoder()); + } + + @Override + public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { + this.mos = new MultipleOutputs(taskContext); + } + + @Override + public void process(WindowedValue<T> elem) throws IOException, InterruptedException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + coder.encode(elem, stream); + + mos.write(fileName, NullWritable.get(), new BytesWritable(stream.toByteArray())); + } + + @Override + public void finish() { + try { + mos.close(); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + public String getFileName() { + return fileName; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java index 8860caf..b966f2a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java @@ -28,10 +28,9 @@ public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PC TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); Operation<?> operation = new FlattenOperation(); - context.addInitStep(Graphs.Step.of( - userGraphContext.getStepName(), - operation, + context.addInitStep( + Graphs.Step.of(userGraphContext.getStepName(), operation), userGraphContext.getInputTags(), - userGraphContext.getOutputTags())); + userGraphContext.getOutputTags()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java index b6900cc..66e573f 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -21,19 +21,23 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import com.google.common.graph.ElementOrder; import com.google.common.graph.GraphBuilder; import com.google.common.graph.MutableGraph; +import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.ListIterator; import java.util.Objects; import java.util.Set; /** * Graph that represents a Beam DAG. */ -public class Graph<StepT extends Graph.AbstractStep<TagT>, TagT extends Graph.AbstractTag> { +public class Graph<StepT extends Graph.AbstractStep, TagT extends Graph.AbstractTag> { - private final MutableGraph<Vertex> graph; + public final MutableGraph<Vertex> graph; public Graph() { this.graph = GraphBuilder.directed() @@ -45,16 +49,16 @@ public class Graph<StepT extends Graph.AbstractStep<TagT>, TagT extends Graph.Ab /** * Adds {@link StepT} to this {@link Graph}. */ - public void addStep(StepT step) { + public void addStep(StepT step, List<TagT> inTags, List<TagT> outTags) { graph.addNode(step); Set<Vertex> nodes = graph.nodes(); - for (TagT tag : step.getInputTags()) { + for (TagT tag : inTags) { if (!nodes.contains(tag)) { graph.addNode(tag); } graph.putEdge(tag, step); } - for (TagT tag : step.getOutputTags()) { + for (TagT tag : outTags) { if (!nodes.contains(tag)) { graph.addNode(tag); } @@ -93,7 +97,18 @@ public class Graph<StepT extends Graph.AbstractStep<TagT>, TagT extends Graph.Ab public boolean apply(Vertex input) { return input instanceof AbstractStep; }})) - .toList(); + .toSortedList(new Comparator<StepT>() { + @Override + public int compare(StepT left, StepT right) { + if (left.equals(right)) { + return 0; + } else if (com.google.common.graph.Graphs.reachableNodes(graph, left).contains(right)) { + return -1; + } else { + return 1; + } + } + }); } public List<StepT> getStartSteps() { @@ -106,32 +121,40 @@ public class Graph<StepT extends Graph.AbstractStep<TagT>, TagT extends Graph.Ab .toList(); } - public List<TagT> getInputTags() { - return castToTagList(FluentIterable.from(graph.nodes()) - .filter(new Predicate<Vertex>() { - @Override - public boolean apply(Vertex input) { - return input instanceof AbstractTag && graph.inDegree(input) == 0; - }})) - .toList(); + public StepT getProducer(TagT tag) { + if (contains(tag)) { + return (StepT) Iterables.getOnlyElement(graph.predecessors(tag)); + } else { + return null; + } } - public List<TagT> getOutputTags() { - return castToTagList(FluentIterable.from(graph.nodes()) - .filter(new Predicate<Vertex>() { - @Override - public boolean apply(Vertex input) { - return input instanceof AbstractTag && graph.outDegree(input) == 0; - }})) - .toList(); + public List<StepT> getConsumers(TagT tag) { + if (contains(tag)) { + return castToStepList(graph.successors(tag)).toList(); + } else { + return Collections.emptyList(); + } } - public StepT getProducer(TagT tag) { - return (StepT) Iterables.getOnlyElement(graph.predecessors(tag)); + public List<TagT> getInputTags(StepT step) { + if (contains(step)) { + return castToTagList(graph.predecessors(step)).toList(); + } else { + return Collections.emptyList(); + } } - public List<StepT> getConsumers(TagT tag) { - return castToStepList(graph.successors(tag)).toList(); + public List<TagT> getOutputTags(StepT step) { + if (contains(step)) { + return castToTagList(graph.successors(step)).toList(); + } else { + return Collections.emptyList(); + } + } + + private boolean contains(Vertex node) { + return graph.nodes().contains(node); } private FluentIterable<StepT> castToStepList(Iterable<Vertex> vertices) { @@ -175,9 +198,7 @@ public class Graph<StepT extends Graph.AbstractStep<TagT>, TagT extends Graph.Ab interface Vertex { } - public abstract static class AbstractStep<TagT extends AbstractTag> implements Vertex { - public abstract List<TagT> getInputTags(); - public abstract List<TagT> getOutputTags(); + public abstract static class AbstractStep implements Vertex { } public abstract static class AbstractTag implements Vertex { http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index be694e4..13d215f 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -17,8 +17,9 @@ */ package org.apache.beam.runners.mapreduce.translation; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.beam.sdk.values.TupleTag; /** * Class that optimizes the initial graph to a fused graph. @@ -29,31 +30,47 @@ public class GraphPlanner { public GraphPlanner() { } - public Graphs.FusedGraph plan(Graph<Graphs.Step, Graphs.Tag> initGraph) { - Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(); - // Convert from the list of steps to Graphs. - for (Graphs.Step step : Lists.reverse(initGraph.getSteps())) { - Graphs.FusedStep fusedStep = new Graphs.FusedStep(); - fusedStep.addStep(step); - fusedGraph.addFusedStep(fusedStep); + public Graphs.FusedGraph plan(Graphs.FusedGraph fusedGraph) { + // Attach writes/reads on fusion boundaries. + for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { + for (Graphs.Tag tag : fusedGraph.getOutputTags(fusedStep)) { + List<Graphs.FusedStep> consumers = fusedGraph.getConsumers(tag); + if (consumers.isEmpty()) { + continue; + } + Graphs.Step producer = fusedStep.getProducer(tag); + if (producer.getOperation() instanceof ViewOperation) { + continue; + } + String tagName = tag.getName(); + String fileName = tagName.replaceAll("[^A-Za-z0-9]", "0"); + fusedStep.addStep( + Graphs.Step.of( + tagName + "/Write", + new FileWriteOperation(fileName, tag.getCoder())), + ImmutableList.of(tag), + ImmutableList.<Graphs.Tag>of()); - tryFuse(fusedGraph, fusedStep); + String readStepName = tagName + "/Read"; + Graphs.Tag readOutput = Graphs.Tag.of( + readStepName + ".out", new TupleTag<>(), tag.getCoder()); + for (Graphs.FusedStep consumer : consumers) { + // Re-direct tag to readOutput. + List<Graphs.Step> receivers = consumer.getConsumers(tag); + for (Graphs.Step step : receivers) { + consumer.addEdge(readOutput, step); + } + consumer.removeTag(tag); + consumer.addStep( + Graphs.Step.of( + readStepName, + new FileReadOperation(fusedStep.getStageId(), fileName, tag.getCoder())), + ImmutableList.<Graphs.Tag>of(), + ImmutableList.of(readOutput)); + } + } } - return fusedGraph; - } - private void tryFuse(Graphs.FusedGraph fusedGraph, Graphs.FusedStep fusedStep) { - if (fusedStep.getOutputTags().size() != 1) { - return; - } - Graphs.Tag outTag = Iterables.getOnlyElement(fusedStep.getOutputTags()); - if (fusedGraph.getConsumers(outTag).size() != 1) { - return; - } - Graphs.FusedStep consumer = Iterables.getOnlyElement(fusedGraph.getConsumers(outTag)); - if (fusedStep.containsGroupByKey() && consumer.containsGroupByKey()) { - return; - } - fusedGraph.merge(fusedStep, consumer); + return fusedGraph; } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java index cef5afc..97b5441 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.mapreduce.translation; import com.google.auto.value.AutoValue; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; @@ -32,26 +34,68 @@ public class Graphs { public static class FusedGraph { private final Graph<FusedStep, Tag> graph; + private int stageId = 0; public FusedGraph() { this.graph = new Graph<>(); } - public void addFusedStep(FusedStep fusedStep) { - graph.addStep(fusedStep); + public FusedGraph(Graph<Graphs.Step, Tag> initGraph) { + this.graph = new Graph<>(); + + // Convert from the list of steps to Graphs. + for (Graphs.Step step : Lists.reverse(initGraph.getSteps())) { + tryFuse(step, initGraph.getInputTags(step), initGraph.getOutputTags(step)); + } + // Remove unused external tags. + for (FusedStep fusedStep : graph.getSteps()) { + for (Tag outTag : graph.getOutputTags(fusedStep)) { + if (graph.getConsumers(outTag).isEmpty()) { + graph.removeTag(outTag); + } + } + } } - public void merge(FusedStep src, FusedStep dest) { - for (Step step : src.steps.getSteps()) { - dest.addStep(step); + public void tryFuse( + Graphs.Step step, + List<Graphs.Tag> inTags, + List<Graphs.Tag> outTags) { + if (canFuse(step, inTags, outTags)) { + Graphs.Tag outTag = Iterables.getOnlyElement(outTags); + Graphs.FusedStep consumer = Iterables.getOnlyElement(graph.getConsumers(outTag)); + consumer.addStep(step, inTags, outTags); + for (Graphs.Tag in : inTags) { + graph.addEdge(in, consumer); + } + graph.removeTag(outTag); + graph.addEdge(consumer, outTag); + } else { + Graphs.FusedStep newFusedStep = new Graphs.FusedStep(stageId++); + newFusedStep.addStep(step, inTags, outTags); + graph.addStep(newFusedStep, inTags, outTags); + } + } + + private boolean canFuse( + Graphs.Step step, + List<Graphs.Tag> inTags, + List<Graphs.Tag> outTags) { + if (step.getOperation() instanceof ViewOperation) { + return false; + } + if (outTags.size() != 1) { + return false; } - for (Tag inTag : src.getInputTags()) { - graph.addEdge(inTag, dest); + Graphs.Tag outTag = Iterables.getOnlyElement(outTags); + if (graph.getConsumers(outTag).size() != 1) { + return false; } - for (Tag outTag : src.getOutputTags()) { - graph.addEdge(dest, outTag); + Graphs.FusedStep consumer = Iterables.getOnlyElement(graph.getConsumers(outTag)); + if (consumer.containsGroupByKey() && step.getOperation() instanceof GroupByKeyOperation) { + return false; } - graph.removeStep(src); + return true; } public FusedStep getProducer(Tag tag) { @@ -65,29 +109,41 @@ public class Graphs { public List<FusedStep> getFusedSteps() { return graph.getSteps(); } + + public List<Tag> getInputTags(FusedStep fusedStep) { + return graph.getInputTags(fusedStep); + } + + public List<Tag> getOutputTags(FusedStep fusedStep) { + return graph.getOutputTags(fusedStep); + } } - public static class FusedStep extends Graph.AbstractStep<Tag> { + public static class FusedStep extends Graph.AbstractStep { + private final int stageId; private final Graph<Step, Tag> steps; private Step groupByKeyStep; - public FusedStep() { + public FusedStep(int stageid) { + this.stageId = stageid; this.steps = new Graph<>(); this.groupByKeyStep = null; } - @Override - public List<Tag> getInputTags() { - return steps.getInputTags(); + public int getStageId() { + return stageId; } - @Override - public List<Tag> getOutputTags() { - return steps.getOutputTags(); + public List<Tag> getInputTags(Step step) { + return steps.getInputTags(step); + } + + public List<Tag> getOutputTags(Step step) { + return steps.getOutputTags(step); } - public void addStep(Step step) { - steps.addStep(step); + public void addStep(Step step, List<Tag> inTags, List<Tag> outTags) { + steps.addStep(step, inTags, outTags); if (step.getOperation() instanceof GroupByKeyOperation) { groupByKeyStep = step; } @@ -156,18 +212,14 @@ public class Graphs { } @AutoValue - public abstract static class Step extends Graph.AbstractStep<Tag> { + public abstract static class Step extends Graph.AbstractStep { abstract String getFullName(); // TODO: remove public public abstract Operation getOperation(); - public static Step of( - String fullName, - Operation operation, - List<Tag> inputTags, - List<Tag> outputTags) { + public static Step of(String fullName, Operation operation) { return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Step( - inputTags, outputTags, fullName, operation); + fullName, operation); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java index 66cf3b6..1ae38da 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java @@ -38,8 +38,9 @@ public class GroupAlsoByWindowsParDoOperation extends ParDoOperation { public GroupAlsoByWindowsParDoOperation( PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, - Coder<?> inputCoder) { - super(options, new TupleTag<>(), ImmutableList.<TupleTag<?>>of(), windowingStrategy); + Coder<?> inputCoder, + Graphs.Tag outTag) { + super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), windowingStrategy); this.inputCoder = checkNotNull(inputCoder, "inputCoder"); } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java index e87ed09..4c627d7 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java @@ -37,10 +37,9 @@ class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey< GroupByKeyOperation<K, V> groupByKeyOperation = new GroupByKeyOperation<>(windowingStrategy, (KvCoder<K, V>) inCoder); - context.addInitStep(Graphs.Step.of( - userGraphContext.getStepName(), - groupByKeyOperation, + context.addInitStep( + Graphs.Step.of(userGraphContext.getStepName(), groupByKeyOperation), userGraphContext.getInputTags(), - userGraphContext.getOutputTags())); + userGraphContext.getOutputTags()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index 24feebd..1016e22 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -20,13 +20,14 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Set; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.BoundedSource; @@ -38,8 +39,12 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * Class that translates a {@link Graphs.FusedStep} to a MapReduce job. @@ -53,13 +58,11 @@ public class JobPrototype { private final int stageId; private final Graphs.FusedStep fusedStep; - private final Set<JobPrototype> dependencies; private final PipelineOptions options; private JobPrototype(int stageId, Graphs.FusedStep fusedStep, PipelineOptions options) { this.stageId = stageId; this.fusedStep = checkNotNull(fusedStep, "fusedStep"); - this.dependencies = Sets.newHashSet(); this.options = checkNotNull(options, "options"); } @@ -72,19 +75,38 @@ public class JobPrototype { "org.apache.hadoop.io.serializer.WritableSerialization," + "org.apache.hadoop.io.serializer.JavaSerialization"); + //TODO: config out dir with PipelineOptions. + conf.set( + FileOutputFormat.OUTDIR, + String.format("/tmp/mapreduce/stage-%d", fusedStep.getStageId())); + // Setup BoundedSources in BeamInputFormat. // TODO: support more than one read steps by introducing a composed BeamInputFormat // and a partition operation. - Graphs.Step readStep = Iterables.getOnlyElement(fusedStep.getStartSteps()); - checkState(readStep.getOperation() instanceof ReadOperation); - BoundedSource source = ((ReadOperation) readStep.getOperation()).getSource(); + List<Graphs.Step> readSteps = fusedStep.getStartSteps(); + ArrayList<BoundedSource<?>> sources = new ArrayList<>(); + sources.addAll( + FluentIterable.from(readSteps) + .transform(new Function<Graphs.Step, BoundedSource<?>>() { + @Override + public BoundedSource<?> apply(Graphs.Step step) { + checkState(step.getOperation() instanceof SourceOperation); + return ((SourceOperation) step.getOperation()).getSource(); + }}) + .toList()); + conf.set( BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE, - Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source))); + Base64.encodeBase64String(SerializableUtils.serializeToByteArray(sources))); + conf.set( + BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS, + Base64.encodeBase64String(SerializableUtils.serializeToByteArray( + new SerializedPipelineOptions(options)))); job.setInputFormatClass(BeamInputFormat.class); if (fusedStep.containsGroupByKey()) { Graphs.Step groupByKey = fusedStep.getGroupByKeyStep(); + Graphs.Tag gbkOutTag = Iterables.getOnlyElement(fusedStep.getOutputTags(groupByKey)); GroupByKeyOperation operation = (GroupByKeyOperation) groupByKey.getOperation(); WindowingStrategy<?, ?> windowingStrategy = operation.getWindowingStrategy(); KvCoder<?, ?> kvCoder = operation.getKvCoder(); @@ -92,28 +114,26 @@ public class JobPrototype { String reifyStepName = groupByKey.getFullName() + "-Reify"; Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), windowingStrategy); Graphs.Tag reifyOutputTag = Graphs.Tag.of( - reifyStepName + ".out", new TupleTag<Object>(), reifyValueCoder); + reifyStepName + ".out", new TupleTag<>(), reifyValueCoder); Graphs.Step reifyStep = Graphs.Step.of( reifyStepName, - new ReifyTimestampAndWindowsParDoOperation(options, operation.getWindowingStrategy()), - groupByKey.getInputTags(), - ImmutableList.of(reifyOutputTag)); + new ReifyTimestampAndWindowsParDoOperation( + options, operation.getWindowingStrategy(), reifyOutputTag)); Graphs.Step writeStep = Graphs.Step.of( groupByKey.getFullName() + "-Write", - new WriteOperation(kvCoder.getKeyCoder(), reifyValueCoder), - ImmutableList.of(reifyOutputTag), - Collections.<Graphs.Tag>emptyList()); + new ShuffleWriteOperation(kvCoder.getKeyCoder(), reifyValueCoder)); Graphs.Step gabwStep = Graphs.Step.of( groupByKey.getFullName() + "-GroupAlsoByWindows", - new GroupAlsoByWindowsParDoOperation(options, windowingStrategy, kvCoder), - Collections.<Graphs.Tag>emptyList(), - groupByKey.getOutputTags()); - - fusedStep.addStep(reifyStep); - fusedStep.addStep(writeStep); - fusedStep.addStep(gabwStep); + new GroupAlsoByWindowsParDoOperation(options, windowingStrategy, kvCoder, gbkOutTag)); + + fusedStep.addStep( + reifyStep, fusedStep.getInputTags(groupByKey), ImmutableList.of(reifyOutputTag)); + fusedStep.addStep( + writeStep, ImmutableList.of(reifyOutputTag), Collections.<Graphs.Tag>emptyList()); + fusedStep.addStep( + gabwStep, Collections.<Graphs.Tag>emptyList(), ImmutableList.of(gbkOutTag)); fusedStep.removeStep(groupByKey); // Setup BeamReducer @@ -129,8 +149,9 @@ public class JobPrototype { SerializableUtils.serializeToByteArray(reducerStartStep.getOperation()))); job.setReducerClass(BeamReducer.class); } + // Setup DoFns in BeamMapper. - Graphs.Tag readOutputTag = Iterables.getOnlyElement(readStep.getOutputTags()); + Graphs.Tag readOutputTag = Iterables.getOnlyElement(fusedStep.getOutputTags(readSteps.get(0))); Graphs.Step mapperStartStep = Iterables.getOnlyElement(fusedStep.getConsumers(readOutputTag)); chainOperations(mapperStartStep, fusedStep); @@ -141,18 +162,28 @@ public class JobPrototype { Base64.encodeBase64String( SerializableUtils.serializeToByteArray(mapperStartStep.getOperation()))); job.setMapperClass(BeamMapper.class); - - job.setOutputFormatClass(NullOutputFormat.class); - + job.setOutputFormatClass(TextOutputFormat.class); + + for (Graphs.Step step : fusedStep.getSteps()) { + if (step.getOperation() instanceof FileWriteOperation) { + FileWriteOperation writeOperation = (FileWriteOperation) step.getOperation(); + //SequenceFileOutputFormat.setOutputPath(job, new Path("/tmp/mapreduce/")); + MultipleOutputs.addNamedOutput( + job, + writeOperation.getFileName(), + SequenceFileOutputFormat.class, + NullWritable.class, BytesWritable.class); + } + } return job; } private void chainOperations(Graphs.Step current, Graphs.FusedStep fusedStep) { Operation<?> operation = current.getOperation(); - List<Graphs.Tag> outputTags = current.getOutputTags(); - for (int index = 0; index < outputTags.size(); ++index) { - for (Graphs.Step consumer : fusedStep.getConsumers(outputTags.get(index))) { - operation.attachConsumer(index, consumer.getOperation()); + List<Graphs.Tag> outputTags = fusedStep.getOutputTags(current); + for (Graphs.Tag outTag : outputTags) { + for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) { + operation.attachConsumer(outTag.getTupleTag(), consumer.getOperation()); } } for (Graphs.Tag outTag : outputTags) { http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java index 187ea79..574f152 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java @@ -18,9 +18,11 @@ package org.apache.beam.runners.mapreduce.translation; import com.google.common.collect.ImmutableList; +import java.io.IOException; import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.hadoop.mapreduce.TaskInputOutputContext; /** @@ -55,7 +57,7 @@ public abstract class Operation<T> implements Serializable { /** * Processes the element. */ - public abstract void process(WindowedValue<T> elem); + public abstract void process(WindowedValue<T> elem) throws IOException, InterruptedException; /** * Finishes this Operation's execution. @@ -80,8 +82,13 @@ public abstract class Operation<T> implements Serializable { /** * Adds an output to this Operation. */ - public void attachConsumer(int outputIndex, Operation consumer) { + public void attachConsumer(TupleTag<?> tupleTag, Operation consumer) { + int outputIndex = getOutputIndex(tupleTag); OutputReceiver fanOut = receivers[outputIndex]; fanOut.addOutput(consumer); } + + protected int getOutputIndex(TupleTag<?> tupleTag) { + return 0; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java index 3dab890..b2f1b6d 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java @@ -17,7 +17,9 @@ */ package org.apache.beam.runners.mapreduce.translation; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -46,7 +48,12 @@ public class OutputReceiver implements Serializable { public void process(WindowedValue<?> elem) { for (Operation out : receivingOperations) { if (out != null) { - out.process(elem); + try { + out.process(elem); + } catch (IOException | InterruptedException e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } } } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java index a76773f..c6bf49c 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import java.util.List; import javax.annotation.Nullable; @@ -26,24 +27,23 @@ import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.hadoop.mapreduce.TaskInputOutputContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Operation for ParDo. */ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> { - private static final Logger LOG = LoggerFactory.getLogger(ParDoOperation.class); - protected final SerializedPipelineOptions options; protected final TupleTag<OutputT> mainOutputTag; private final List<TupleTag<?>> sideOutputTags; protected final WindowingStrategy<?, ?> windowingStrategy; + protected DoFnInvoker<InputT, OutputT> doFnInvoker; private DoFnRunner<InputT, OutputT> fnRunner; public ParDoOperation( @@ -65,6 +65,12 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> @Override public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { + super.start(taskContext); + DoFn<InputT, OutputT> doFn = getDoFn(); + // Process user's setup + doFnInvoker = DoFnInvokers.invokerFor(doFn); + doFnInvoker.invokeSetup(); + fnRunner = DoFnRunners.simpleRunner( options.getPipelineOptions(), getDoFn(), @@ -75,7 +81,6 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> null, windowingStrategy); fnRunner.startBundle(); - super.start(taskContext); } /** @@ -83,14 +88,27 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> */ @Override public void process(WindowedValue<InputT> elem) { - LOG.info("elem: {}.", elem); fnRunner.processElement(elem); } @Override public void finish() { - super.finish(); fnRunner.finishBundle(); + doFnInvoker.invokeTeardown(); + super.finish(); + } + + @Override + protected int getOutputIndex(TupleTag<?> tupleTag) { + if (tupleTag == mainOutputTag) { + return 0; + } else { + int sideIndex = sideOutputTags.indexOf(tupleTag); + checkState( + sideIndex >= 0, + String.format("Cannot find index for tuple tag: %s.", tupleTag)); + return sideIndex + 1; + } } protected DoFnRunners.OutputManager createOutputManager() { @@ -100,15 +118,10 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> private class ParDoOutputManager implements DoFnRunners.OutputManager { @Nullable - private OutputReceiver getReceiverOrNull(TupleTag<?> tag) { + private OutputReceiver getReceiverOrNull(TupleTag<?> tupleTag) { List<OutputReceiver> receivers = getOutputReceivers(); - if (tag.equals(mainOutputTag)) { - return receivers.get(0); - } else if (sideOutputTags.contains(tag)) { - return receivers.get(sideOutputTags.indexOf(tag) + 1); - } else { - return null; - } + int outputIndex = getOutputIndex(tupleTag); + return receivers.get(outputIndex); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java index 1a1373a..9bd89fd 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java @@ -36,11 +36,9 @@ class ParDoTranslator<InputT, OutputT> transform.getMainOutputTag(), transform.getAdditionalOutputTags().getAll(), ((PCollection) userGraphContext.getInput()).getWindowingStrategy()); - - context.addInitStep(Graphs.Step.of( - userGraphContext.getStepName(), - operation, + context.addInitStep( + Graphs.Step.of(userGraphContext.getStepName(), operation), userGraphContext.getInputTags(), - userGraphContext.getOutputTags())); + userGraphContext.getOutputTags()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java index 0710827..86ee78a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java @@ -20,18 +20,17 @@ package org.apache.beam.runners.mapreduce.translation; import org.apache.beam.sdk.io.Read; /** - * Translates a {@link Read.Bounded} to a {@link ReadOperation}. + * Translates a {@link Read.Bounded} to a {@link SourceOperation}. */ class ReadBoundedTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> { @Override public void translateNode(Read.Bounded transform, TranslationContext context) { TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - ReadOperation operation = new ReadOperation(transform.getSource()); - context.addInitStep(Graphs.Step.of( - userGraphContext.getStepName(), - operation, + SourceOperation operation = new SourceOperation(transform.getSource()); + context.addInitStep( + Graphs.Step.of(userGraphContext.getStepName(), operation), userGraphContext.getInputTags(), - userGraphContext.getOutputTags())); + userGraphContext.getOutputTags()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java deleted file mode 100644 index c199dc6..0000000 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.mapreduce.translation; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * A Read.Bounded place holder {@link Operation} during pipeline translation. - */ -class ReadOperation<T> extends Operation<T> { - private final BoundedSource<T> source; - - ReadOperation(BoundedSource<T> source) { - super(1); - this.source = checkNotNull(source, "source"); - } - - @Override - public void process(WindowedValue elem) { - throw new IllegalStateException( - String.format("%s should not in execution graph.", this.getClass().getSimpleName())); - } - - BoundedSource<?> getSource() { - return source; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java index 83d1af5..251828e 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java @@ -34,8 +34,9 @@ public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation { public ReifyTimestampAndWindowsParDoOperation( PipelineOptions options, - WindowingStrategy<?, ?> windowingStrategy) { - super(options, new TupleTag<>(), ImmutableList.<TupleTag<?>>of(), windowingStrategy); + WindowingStrategy<?, ?> windowingStrategy, + Graphs.Tag outTag) { + super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), windowingStrategy); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java new file mode 100644 index 0000000..782cfef --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +/** + * {@link Operation} that materializes input for group by key. + */ +public class ShuffleWriteOperation<T> extends Operation<T> { + + private final Coder<Object> keyCoder; + private final Coder<Object> valueCoder; + + private transient TaskInputOutputContext<Object, Object, Object, Object> taskContext; + + public ShuffleWriteOperation(Coder<Object> keyCoder, Coder<Object> valueCoder) { + super(0); + this.keyCoder = checkNotNull(keyCoder, "keyCoder"); + this.valueCoder = checkNotNull(valueCoder, "valueCoder"); + } + + @Override + public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { + this.taskContext = checkNotNull(taskContext, "taskContext"); + } + + @Override + public void process(WindowedValue<T> elem) throws IOException, InterruptedException { + KV<?, ?> kv = (KV<?, ?>) elem.getValue(); + ByteArrayOutputStream keyStream = new ByteArrayOutputStream(); + keyCoder.encode(kv.getKey(), keyStream); + + ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); + valueCoder.encode(kv.getValue(), valueStream); + taskContext.write(new BytesWritable(keyStream.toByteArray()), valueStream.toByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java new file mode 100644 index 0000000..2163f34 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * A Read.Bounded place holder {@link Operation} during pipeline translation. + */ +class SourceOperation<T> extends Operation<T> { + private final BoundedSource<T> source; + + SourceOperation(BoundedSource<T> source) { + super(1); + this.source = checkNotNull(source, "source"); + } + + @Override + public void process(WindowedValue elem) { + throw new IllegalStateException( + String.format("%s should not in execution graph.", this.getClass().getSimpleName())); + } + + BoundedSource<?> getSource() { + return source; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java index 365bdc0..da8ebff 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java @@ -53,8 +53,8 @@ public class TranslationContext { return userGraphContext; } - public void addInitStep(Graphs.Step step) { - initGraph.addStep(step); + public void addInitStep(Graphs.Step step, List<Graphs.Tag> inTags, List<Graphs.Tag> outTags) { + initGraph.addStep(step, inTags, outTags); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java index f79260a..e51d392 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java @@ -26,16 +26,11 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.Window; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Lookup table mapping PTransform types to associated TransformTranslator implementations. */ public class TranslatorRegistry { - - private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class); - private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>(); @@ -49,10 +44,6 @@ public class TranslatorRegistry { } public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) { - TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass()); - if (translator == null) { - LOG.warn("Unsupported operator={}", transform.getClass().getName()); - } - return translator; + return TRANSLATORS.get(transform.getClass()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java index 815ce77..d5eac73 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java @@ -32,11 +32,9 @@ public class ViewTranslator extends TransformTranslator.Default<View.CreatePColl ViewOperation<?> operation = new ViewOperation<>((Coder) transform.getView().getPCollection().getCoder()); - - context.addInitStep(Graphs.Step.of( - userGraphContext.getStepName(), - operation, + context.addInitStep( + Graphs.Step.of(userGraphContext.getStepName(), operation), userGraphContext.getInputTags(), - userGraphContext.getOutputTags())); + userGraphContext.getOutputTags()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java index 367c375..3908870 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java @@ -29,10 +29,9 @@ public class WindowAssignTranslator<T> extends TransformTranslator.Default<Windo TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); WindowAssignOperation<T, ?> operation = new WindowAssignOperation<>(transform.getWindowFn()); - context.addInitStep(Graphs.Step.of( - userGraphContext.getStepName(), - operation, + context.addInitStep( + Graphs.Step.of(userGraphContext.getStepName(), operation), userGraphContext.getInputTags(), - userGraphContext.getOutputTags())); + userGraphContext.getOutputTags()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java deleted file mode 100644 index 2eb4684..0000000 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.mapreduce.translation; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.Throwables; -import java.io.ByteArrayOutputStream; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; - -/** - * {@link Operation} that materializes input for group by key. - */ -public class WriteOperation<T> extends Operation<T> { - - private final Coder<Object> keyCoder; - private final Coder<Object> valueCoder; - - private transient TaskInputOutputContext<Object, Object, Object, Object> taskContext; - - public WriteOperation(Coder<Object> keyCoder, Coder<Object> valueCoder) { - super(0); - this.keyCoder = checkNotNull(keyCoder, "keyCoder"); - this.valueCoder = checkNotNull(valueCoder, "valueCoder"); - } - - @Override - public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { - this.taskContext = checkNotNull(taskContext, "taskContext"); - } - - @Override - public void process(WindowedValue<T> elem) { - KV<?, ?> kv = (KV<?, ?>) elem.getValue(); - try { - ByteArrayOutputStream keyStream = new ByteArrayOutputStream(); - keyCoder.encode(kv.getKey(), keyStream); - - ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); - valueCoder.encode(kv.getValue(), valueStream); - taskContext.write(new BytesWritable(keyStream.toByteArray()), valueStream.toByteArray()); - } catch (Exception e) { - Throwables.throwIfUnchecked(e); - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java index cf5262f..ac965cb 100644 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java @@ -55,7 +55,8 @@ public class GraphPlannerTest { p.traverseTopologically(graphConverter); GraphPlanner planner = new GraphPlanner(); - Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph()); + Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(context.getInitGraph()); + fusedGraph = planner.plan(fusedGraph); assertEquals(1, Iterables.size(fusedGraph.getFusedSteps())); assertEquals(3, Iterables.getOnlyElement(fusedGraph.getFusedSteps()).getSteps().size());