mr-runner: Removes WordCountTest, fixes checkstyle, findbugs, and addressed comments.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32aeb7ac Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32aeb7ac Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32aeb7ac Branch: refs/heads/mr-runner Commit: 32aeb7ac3d49ade0dc3ad79e711e7b624091d485 Parents: 9d1db98 Author: Pei He <p...@apache.org> Authored: Thu Aug 31 16:21:17 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Mon Sep 4 11:06:41 2017 +0800 ---------------------------------------------------------------------- runners/map-reduce/pom.xml | 2 +- .../mapreduce/MapReducePipelineOptions.java | 3 + .../mapreduce/MapReducePipelineResult.java | 6 + .../runners/mapreduce/MapReduceRegistrar.java | 6 + .../beam/runners/mapreduce/MapReduceRunner.java | 5 +- .../beam/runners/mapreduce/package-info.java | 2 +- .../mapreduce/translation/BeamInputFormat.java | 2 +- .../mapreduce/translation/BeamMapper.java | 4 +- .../mapreduce/translation/BeamReducer.java | 6 +- .../translation/ConfigurationUtils.java | 1 - .../mapreduce/translation/DotfileWriter.java | 22 ++-- .../translation/FileReadOperation.java | 1 - .../runners/mapreduce/translation/Graph.java | 8 +- .../mapreduce/translation/GraphConverter.java | 2 +- .../mapreduce/translation/GraphPlanner.java | 1 - .../runners/mapreduce/translation/Graphs.java | 13 +++ .../translation/PartitionOperation.java | 2 - .../ReifyTimestampAndWindowsParDoOperation.java | 2 +- .../translation/ShuffleWriteOperation.java | 1 - .../translation/TranslationContext.java | 7 +- .../beam/runners/mapreduce/WordCountTest.java | 117 ------------------- 21 files changed, 65 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/pom.xml ---------------------------------------------------------------------- diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml index 3b253a7..90d876b 100644 --- a/runners/map-reduce/pom.xml +++ b/runners/map-reduce/pom.xml @@ -24,7 +24,7 @@ <relativePath>../pom.xml</relativePath> </parent> - <artifactId>beam-runners-map-reduce</artifactId> + <artifactId>beam-runners-mapreduce</artifactId> <name>Apache Beam :: Runners :: MapReduce</name> http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java index cfbc006..7cff40d 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java @@ -48,6 +48,9 @@ public interface MapReducePipelineOptions extends PipelineOptions { String getFileOutputDir(); void setFileOutputDir(String fileOutputDir); + /** + * Returns the {@link Class} that constructs MapReduce job through Beam. + */ class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> { @Override public Class<?> create(PipelineOptions options) { http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java index 90c521a..933d8f6 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java @@ -27,6 +27,12 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.hadoop.mapreduce.Job; import org.joda.time.Duration; +/** + * A {@link PipelineResult} of executing {@link org.apache.beam.sdk.Pipeline Pipelines} using + * {@link MapReduceRunner}. + * + * <p>It is synchronous (returned after the pipeline is finished), and is used for querying metrics. + */ public class MapReducePipelineResult implements PipelineResult { private final List<Job> jobs; http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java index c8b0eea..1029218 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java @@ -31,6 +31,9 @@ public class MapReduceRegistrar { private MapReduceRegistrar() { } + /** + * Registers the {@link MapReduceRunner}. + */ @AutoService(PipelineRunnerRegistrar.class) public static class Runner implements PipelineRunnerRegistrar { @Override @@ -39,6 +42,9 @@ public class MapReduceRegistrar { } } + /** + * Registers the {@link MapReducePipelineOptions}. + */ @AutoService(PipelineOptionsRegistrar.class) public static class Options implements PipelineOptionsRegistrar { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index 8198848..85b7d1b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -41,7 +41,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@link PipelineRunner} for MapReduce. + * {@link PipelineRunner} for Hadoop MapReduce. + * + * <p>It translates a Beam {@link Pipeline} to a series of MapReduce {@link Job jobs}, and executes + * them locally or on a Hadoop cluster. */ public class MapReduceRunner extends PipelineRunner<PipelineResult> { http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java index d511405..e452d92 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java @@ -16,6 +16,6 @@ * limitations under the License. */ /** - * MapReduce runner implementation. + * Implementation of the Beam runner for Apache Hadoop MapReduce. */ package org.apache.beam.runners.mapreduce; http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java index 9dc3396..3d0b8ea 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -117,7 +117,7 @@ public class BeamInputFormat<T> extends InputFormat { return ((BeamInputSplit) split).createReader(); } - public static class BeamInputSplit<T> extends InputSplit implements Writable { + private static class BeamInputSplit<T> extends InputSplit implements Writable { private String stepName; private BoundedSource<T> boundedSource; private SerializedPipelineOptions options; http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java index b03236f..46c74c0 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java @@ -30,7 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Adapter for executing Beam transforms in {@link Mapper}. + * Adapter for executing {@link Operation operations} in {@link Mapper}. */ public class BeamMapper<ValueInT, ValueOutT> extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>> { @@ -58,6 +58,8 @@ public class BeamMapper<ValueInT, ValueOutT> Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) throws IOException, InterruptedException { LOG.info("key: {} value: {}.", key, value); + // Only needs to pass KV to the following PartitionOperation. However, we have to wrap it in a + // global window because of the method signature. operation.process(WindowedValue.valueInGlobalWindow(KV.of(key, value))); } http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java index a382904..b69be32 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java @@ -33,15 +33,13 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** - * Adapter for executing Beam transforms in {@link Reducer}. + * Adapter for executing {@link Operation operations} in {@link Reducer}. */ public class BeamReducer<ValueInT, ValueOutT> extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>> { @@ -92,6 +90,8 @@ public class BeamReducer<ValueInT, ValueOutT> }})); Object decodedKey = keyCoder.decode(new ByteArrayInputStream(key.getBytes())); LOG.info("key: {} value: {}.", decodedKey, decodedValues); + // Only needs to pass KV to the following GABW operation. However, we have to wrap it in a + // global window because of the method signature. operation.process( WindowedValue.valueInGlobalWindow(KV.of(decodedKey, decodedValues))); } http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java index 4ec50bd..a905d29 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java @@ -23,7 +23,6 @@ import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java index 863c4c9..12cc03c 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java @@ -36,18 +36,18 @@ public class DotfileWriter { int i = 0; for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { String clusterId = String.format("cluster_%d", i++); - sb.append(String.format(" subgraph \"%s\" {\n", clusterId)); - sb.append(String.format(" \"%s\" [shape=point style=invis];\n", clusterId)); + sb.append(String.format(" subgraph \"%s\" {%n", clusterId)); + sb.append(String.format(" \"%s\" [shape=point style=invis];%n", clusterId)); fusedStepToId.put(fusedStep, clusterId); Set<String> nodeDefines = Sets.newHashSet(); for (Graphs.Step step : fusedStep.getSteps()) { - nodeDefines.add(String.format(" \"%s\" [shape=box];\n", step.getFullName())); + nodeDefines.add(String.format(" \"%s\" [shape=box];%n", step.getFullName())); for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) { - nodeDefines.add(String.format(" \"%s\" [shape=ellipse];\n", inTag)); + nodeDefines.add(String.format(" \"%s\" [shape=ellipse];%n", inTag)); } for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) { - nodeDefines.add(String.format(" \"%s\" [shape=ellipse];\n", outTag)); + nodeDefines.add(String.format(" \"%s\" [shape=ellipse];%n", outTag)); } } for (String str : nodeDefines) { @@ -59,16 +59,16 @@ public class DotfileWriter { // Edges within fused steps. for (Graphs.Step step : fusedStep.getSteps()) { for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) { - sb.append(String.format(" \"%s\" -> \"%s\";\n", inTag, step)); + sb.append(String.format(" \"%s\" -> \"%s\";%n", inTag, step)); } for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) { - sb.append(String.format(" \"%s\" -> \"%s\";\n", step, outTag)); + sb.append(String.format(" \"%s\" -> \"%s\";%n", step, outTag)); } } // Edges between sub-graphs. for (Graphs.Tag inTag : fusedGraph.getInputTags(fusedStep)) { - sb.append(String.format(" \"%s\" -> \"%s\";\n", inTag, fusedStepToId.get(fusedStep))); + sb.append(String.format(" \"%s\" -> \"%s\";%n", inTag, fusedStepToId.get(fusedStep))); } } sb.append("}\n"); @@ -79,12 +79,12 @@ public class DotfileWriter { StringBuilder sb = new StringBuilder(); sb.append("\ndigraph G {\n"); for (Graphs.Step step : fusedStep.getSteps()) { - sb.append(String.format(" \"%s\" [shape=box];\n", step.getFullName())); + sb.append(String.format(" \"%s\" [shape=box];%n", step.getFullName())); for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) { - sb.append(String.format(" \"%s\" -> \"%s\";\n", inTag, step)); + sb.append(String.format(" \"%s\" -> \"%s\";%n", inTag, step)); } for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) { - sb.append(String.format(" \"%s\" -> \"%s\";\n", step, outTag)); + sb.append(String.format(" \"%s\" -> \"%s\";%n", step, outTag)); } } sb.append("}\n"); http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java index f212252..eb5bef4 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java index 144f9a4..b4549d3 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -29,9 +29,7 @@ import com.google.common.graph.GraphBuilder; import com.google.common.graph.MutableGraph; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; -import java.util.ListIterator; import java.util.Objects; import java.util.Set; @@ -206,9 +204,15 @@ public class Graph<StepT extends Graph.AbstractStep, TagT extends Graph.Abstract interface Vertex { } + /** + * Step {@link Vertex}. + */ public abstract static class AbstractStep implements Vertex { } + /** + * Tag {@link Vertex}. + */ public abstract static class AbstractTag implements Vertex { } } http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java index 458961f..1a4988b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java @@ -123,7 +123,7 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { public String getDotfile() { return String.format( - "\ndigraph G {\n%s%s}\n", + "%ndigraph G {%n%s%s}%n", dotfileNodesBuilders.peek().toString(), dotfileEdgesBuilder.toString()); } http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index 09998ea..bc360fb 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -29,7 +29,6 @@ import java.util.Map; import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; /** * Class that optimizes the initial graph to a fused graph. http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java index 0b93c3a..f23e572 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java @@ -34,6 +34,9 @@ public class Graphs { private Graphs() {} + /** + * Class that represents an optimized graph. + */ public static class FusedGraph { private final Graph<FusedStep, Tag> graph; private int stageId = 0; @@ -121,6 +124,10 @@ public class Graphs { } } + /** + * An {@link Graph.AbstractStep} that represents an optimized sub-graph that can be executed + * in one MapReduce job. + */ public static class FusedStep extends Graph.AbstractStep { private final int stageId; private final Graph<Step, Tag> steps; @@ -213,6 +220,9 @@ public class Graphs { } } + /** + * An {@link Graph.AbstractStep} that represents one {@link Operation}. + */ @AutoValue public abstract static class Step extends Graph.AbstractStep { abstract String getFullName(); @@ -230,6 +240,9 @@ public class Graphs { } } + /** + * An {@link Graph.AbstractTag} that contains information about input/output data. + */ @AutoValue public abstract static class Tag extends Graph.AbstractTag implements Serializable { abstract String getName(); http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java index 687b5b9..dc0f81a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java @@ -20,8 +20,6 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; import java.io.IOException; import java.util.List; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java index 9d6b895..0e02bbb 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java @@ -45,7 +45,7 @@ public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation { return (DoFn) new ReifyTimestampAndWindowsDoFn<>(); } - public class ReifyTimestampAndWindowsDoFn<K, V> + private static class ReifyTimestampAndWindowsDoFn<K, V> extends DoFn<KV<K, V>, KV<K, WindowedValue<V>>> { @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java index 782cfef..a8fae1b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.base.Throwables; import java.io.ByteArrayOutputStream; import java.io.IOException; import org.apache.beam.sdk.coders.Coder; http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java index 93856de..e908e93 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java @@ -86,7 +86,7 @@ public class TranslationContext { this.currentNode = node; for (Map.Entry<TupleTag<?>, PValue> entry : currentNode.getOutputs().entrySet()) { pValueToTupleTag.put(entry.getValue(), entry.getKey()); - // TODO: this is a hack to get around that ViewAsXXX.expand() return wrong output PValue. + // TODO: this is a hack to get around that ViewAsXYZ.expand() return wrong output PValue. if (node.getTransform() instanceof View.CreatePCollectionView) { View.CreatePCollectionView view = (View.CreatePCollectionView) node.getTransform(); pValueToTupleTag.put(view.getView(), view.getView().getTagInternal()); @@ -125,7 +125,10 @@ public class TranslationContext { if (pValue instanceof PCollection) { PCollection<?> pc = (PCollection<?>) pValue; return Graphs.Tag.of( - pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder(), pc.getWindowingStrategy()); + pc.getName(), + pValueToTupleTag.get(pValue), + pc.getCoder(), + pc.getWindowingStrategy()); } else if (pValue instanceof PCollectionView){ PCollectionView pView = (PCollectionView) pValue; return Graphs.Tag.of( http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java deleted file mode 100644 index 263905c..0000000 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.mapreduce; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.metrics.MetricsFilter; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.log4j.BasicConfigurator; -import org.joda.time.Duration; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test that runs WordCount. - */ -@RunWith(JUnit4.class) -public class WordCountTest { - - public static final String TOKENIZER_PATTERN = "[^\\p{L}]+"; - - /** - * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns - * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it - * to a ParDo in the pipeline. - */ - static class ExtractWordsFn extends DoFn<String, String> { - private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines"); - private final Counter nonEmptyLines = Metrics.counter(ExtractWordsFn.class, "nonEmptyLines"); - - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.inc(); - } else { - nonEmptyLines.inc(); - } - - // Split the line into words. - String[] words = c.element().split(TOKENIZER_PATTERN); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** A SimpleFunction that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { - @Override - public String apply(KV<String, Long> input) { - return input.getKey() + ": " + input.getValue(); - } - } - - @Test - public void testWordCount() { - BasicConfigurator.configure(); - - String input = "/Users/peihe/github/beam/LICENSE"; - String output = "./output"; - MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class); - //options.setJarClass(this.getClass()); - options.setRunner(MapReduceRunner.class); - Pipeline p = Pipeline.create(options); - - // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the - // static FormatAsTextFn() to the ParDo transform. - p.apply("ReadLines", TextIO.read().from(input)) - .apply(Window.<String>into(FixedWindows.of(Duration.millis(1000)))) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()) - .apply(MapElements.via(new FormatAsTextFn())) - .apply("WriteCounts", TextIO.write().to(output)); - - PipelineResult result = p.run(); - Iterable<MetricResult<Long>> counters = result.metrics() - .queryMetrics( - MetricsFilter.builder() - .addNameFilter(MetricNameFilter.named(ExtractWordsFn.class, "emptyLines")) - .build()) - .counters(); - System.out.println(counters.iterator().next()); - } -}