Repository: beam Updated Branches: refs/heads/master cc8e0b9df -> 45f63eb6c
[BEAM-1875] Remove Spark-runner-custom Hadoop and Avro IOs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f7defdb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f7defdb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f7defdb Branch: refs/heads/master Commit: 6f7defdbb1aa14bf5577bf24ba1d3307cd2fc8b9 Parents: cc8e0b9 Author: Amit Sela <amitsel...@gmail.com> Authored: Tue Apr 4 10:31:31 2017 +0300 Committer: Ismaël MejÃa <ieme...@apache.org> Committed: Tue Apr 4 13:58:40 2017 +0200 ---------------------------------------------------------------------- runners/spark/pom.xml | 19 -- .../beam/runners/spark/io/hadoop/HadoopIO.java | 216 ---------------- .../spark/io/hadoop/ShardNameBuilder.java | 111 -------- .../spark/io/hadoop/ShardNameTemplateAware.java | 31 --- .../io/hadoop/ShardNameTemplateHelper.java | 63 ----- .../io/hadoop/TemplatedAvroKeyOutputFormat.java | 45 ---- .../TemplatedSequenceFileOutputFormat.java | 45 ---- .../io/hadoop/TemplatedTextOutputFormat.java | 45 ---- .../runners/spark/io/hadoop/package-info.java | 22 -- .../spark/translation/TransformTranslator.java | 251 ------------------- .../io/hadoop/HadoopFileFormatPipelineTest.java | 121 --------- .../spark/io/hadoop/ShardNameBuilderTest.java | 88 ------- runners/spark/src/test/resources/test_text.txt | 2 - 13 files changed, 1059 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 96480cd..dd174bf 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -248,19 +248,6 @@ </exclusions> </dependency> <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <version>${avro.version}</version> - <classifier>hadoop2</classifier> - <exclusions> - <!-- exclude old Jetty version of servlet API --> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> <version>${dropwizard.metrics.version}</version> @@ -318,12 +305,6 @@ </dependency> <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-hadoop-common</artifactId> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java deleted file mode 100644 index f2457ce..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.io.hadoop; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.io.ShardNameTemplate; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -/** - * Spark native HadoopIO. - */ -public final class HadoopIO { - - private HadoopIO() { - } - - /** - * Read operation from HDFS. - */ - public static final class Read { - - private Read() { - } - - public static <K, V> Bound<K, V> from(String filepattern, - Class<? extends FileInputFormat<K, V>> format, Class<K> key, Class<V> value) { - return new Bound<>(filepattern, format, key, value); - } - - /** - * A {@link PTransform} reading bounded collection of data from HDFS. - * @param <K> the type of the keys - * @param <V> the type of the values - */ - public static class Bound<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { - - private final String filepattern; - private final Class<? extends FileInputFormat<K, V>> formatClass; - private final Class<K> keyClass; - private final Class<V> valueClass; - - Bound(String filepattern, Class<? extends FileInputFormat<K, V>> format, Class<K> key, - Class<V> value) { - checkNotNull(filepattern, "need to set the filepattern of an HadoopIO.Read transform"); - checkNotNull(format, "need to set the format class of an HadoopIO.Read transform"); - checkNotNull(key, "need to set the key class of an HadoopIO.Read transform"); - checkNotNull(value, "need to set the value class of an HadoopIO.Read transform"); - this.filepattern = filepattern; - this.formatClass = format; - this.keyClass = key; - this.valueClass = value; - } - - public String getFilepattern() { - return filepattern; - } - - public Class<? extends FileInputFormat<K, V>> getFormatClass() { - return formatClass; - } - - public Class<V> getValueClass() { - return valueClass; - } - - public Class<K> getKeyClass() { - return keyClass; - } - - @Override - public PCollection<KV<K, V>> expand(PBegin input) { - return PCollection.createPrimitiveOutputInternal(input.getPipeline(), - WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED); - } - - } - - } - - /** - * Write operation on HDFS. - */ - public static final class Write { - - private Write() { - } - - public static <K, V> Bound<K, V> to(String filenamePrefix, - Class<? extends FileOutputFormat<K, V>> format, Class<K> key, Class<V> value) { - return new Bound<>(filenamePrefix, format, key, value); - } - - /** - * A {@link PTransform} writing {@link PCollection} on HDFS. - */ - public static class Bound<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> { - - /** The filename to write to. */ - private final String filenamePrefix; - /** Suffix to use for each filename. */ - private final String filenameSuffix; - /** Requested number of shards. 0 for automatic. */ - private final int numShards; - /** Shard template string. */ - private final String shardTemplate; - private final Class<? extends FileOutputFormat<K, V>> formatClass; - private final Class<K> keyClass; - private final Class<V> valueClass; - private final Map<String, String> configurationProperties; - - Bound(String filenamePrefix, Class<? extends FileOutputFormat<K, V>> format, - Class<K> key, - Class<V> value) { - this(filenamePrefix, "", 0, ShardNameTemplate.INDEX_OF_MAX, format, key, value, - new HashMap<String, String>()); - } - - Bound(String filenamePrefix, String filenameSuffix, int numShards, - String shardTemplate, Class<? extends FileOutputFormat<K, V>> format, - Class<K> key, Class<V> value, Map<String, String> configurationProperties) { - this.filenamePrefix = filenamePrefix; - this.filenameSuffix = filenameSuffix; - this.numShards = numShards; - this.shardTemplate = shardTemplate; - this.formatClass = format; - this.keyClass = key; - this.valueClass = value; - this.configurationProperties = configurationProperties; - } - - public Bound<K, V> withoutSharding() { - return new Bound<>(filenamePrefix, filenameSuffix, 1, "", formatClass, - keyClass, valueClass, configurationProperties); - } - - public Bound<K, V> withConfigurationProperty(String key, String value) { - configurationProperties.put(key, value); - return this; - } - - public String getFilenamePrefix() { - return filenamePrefix; - } - - public String getShardTemplate() { - return shardTemplate; - } - - public int getNumShards() { - return numShards; - } - - public String getFilenameSuffix() { - return filenameSuffix; - } - - public Class<? extends FileOutputFormat<K, V>> getFormatClass() { - return formatClass; - } - - public Class<V> getValueClass() { - return valueClass; - } - - public Class<K> getKeyClass() { - return keyClass; - } - - public Map<String, String> getConfigurationProperties() { - return configurationProperties; - } - - @Override - public PDone expand(PCollection<KV<K, V>> input) { - checkNotNull( - filenamePrefix, "need to set the filename prefix of an HadoopIO.Write transform"); - checkNotNull(formatClass, "need to set the format class of an HadoopIO.Write transform"); - checkNotNull(keyClass, "need to set the key class of an HadoopIO.Write transform"); - checkNotNull(valueClass, "need to set the value class of an HadoopIO.Write transform"); - - checkArgument( - ShardNameTemplateAware.class.isAssignableFrom(formatClass), - "Format class must implement %s", - ShardNameTemplateAware.class.getName()); - - return PDone.in(input.getPipeline()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java deleted file mode 100644 index 11b4b53..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.io.hadoop; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.hadoop.fs.Path; - -/** - * Shard name builder. - */ -public final class ShardNameBuilder { - - private ShardNameBuilder() { - } - - /** - * Replace occurrences of uppercase letters 'N' with the given {code}shardCount{code}, - * left-padded with zeros if necessary. - * @see org.apache.beam.sdk.io.ShardNameTemplate - * @param template the string template containing uppercase letters 'N' - * @param shardCount the total number of shards - * @return a string template with 'N' replaced by the shard count - */ - public static String replaceShardCount(String template, int shardCount) { - return replaceShardPattern(template, "N+", shardCount); - } - - /** - * Replace occurrences of uppercase letters 'S' with the given {code}shardNumber{code}, - * left-padded with zeros if necessary. - * @see org.apache.beam.sdk.io.ShardNameTemplate - * @param template the string template containing uppercase letters 'S' - * @param shardNumber the number of a particular shard - * @return a string template with 'S' replaced by the shard number - */ - public static String replaceShardNumber(String template, int shardNumber) { - return replaceShardPattern(template, "S+", shardNumber); - } - - private static String replaceShardPattern(String template, String pattern, int n) { - Pattern p = Pattern.compile(pattern); - Matcher m = p.matcher(template); - StringBuffer sb = new StringBuffer(); - while (m.find()) { - // replace pattern with a String format string: - // index 1, zero-padding flag (0), width length of matched pattern, decimal conversion - m.appendReplacement(sb, "%1\\$0" + m.group().length() + "d"); - } - m.appendTail(sb); - return String.format(sb.toString(), n); - } - - /** - * @param pathPrefix a relative or absolute path - * @param template a template string - * @return the output directory for the given prefix, template and suffix - */ - public static String getOutputDirectory(String pathPrefix, String template) { - String out = new Path(pathPrefix + template).getParent().toString(); - if (out.isEmpty()) { - return "./"; - } - return out; - } - - /** - * @param pathPrefix a relative or absolute path - * @param template a template string - * @return the prefix of the output filename for the given path prefix and template - */ - public static String getOutputFilePrefix(String pathPrefix, String template) { - String name = new Path(pathPrefix + template).getName(); - if (name.endsWith(template)) { - return name.substring(0, name.length() - template.length()); - } else { - return ""; - } - } - - /** - * @param pathPrefix a relative or absolute path - * @param template a template string - * @return the template for the output filename for the given path prefix and - * template - */ - public static String getOutputFileTemplate(String pathPrefix, String template) { - String name = new Path(pathPrefix + template).getName(); - if (name.endsWith(template)) { - return template; - } else { - return name; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java deleted file mode 100644 index d78b437..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.io.hadoop; - -/** - * A marker interface that implementations of - * {@link org.apache.hadoop.mapreduce.lib.output.FileOutputFormat} implement to indicate - * that they produce shard names that adhere to the template in - * {@link HadoopIO.Write}. - * - * <p>Some common shard names are defined in - * {@link org.apache.beam.sdk.io.ShardNameTemplate}. - */ -public interface ShardNameTemplateAware { -} http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java deleted file mode 100644 index 4a7058b..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.io.hadoop; - -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber; - -import java.io.IOException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Shard name template helper. - */ -public final class ShardNameTemplateHelper { - - private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class); - - public static final String OUTPUT_FILE_PREFIX = "spark.beam.fileoutputformat.prefix"; - public static final String OUTPUT_FILE_TEMPLATE = "spark.beam.fileoutputformat.template"; - public static final String OUTPUT_FILE_SUFFIX = "spark.beam.fileoutputformat.suffix"; - - private ShardNameTemplateHelper() { - } - - public static <K, V> Path getDefaultWorkFile(FileOutputFormat<K, V> format, - TaskAttemptContext context) throws IOException { - FileOutputCommitter committer = - (FileOutputCommitter) format.getOutputCommitter(context); - return new Path(committer.getWorkPath(), getOutputFile(context)); - } - - private static String getOutputFile(TaskAttemptContext context) { - TaskID taskId = context.getTaskAttemptID().getTaskID(); - int partition = taskId.getId(); - - String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX); - String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE); - String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX); - return filePrefix + replaceShardNumber(fileTemplate, partition) + fileSuffix; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java deleted file mode 100644 index 62a610b..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.io.hadoop; - -import java.io.IOException; -import java.io.OutputStream; -import org.apache.avro.mapreduce.AvroKeyOutputFormat; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * Templated Avro key output format. - */ -public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T> - implements ShardNameTemplateAware { - - @Override - public void checkOutputSpecs(JobContext job) { - // don't fail if the output already exists - } - - @Override - protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException { - Path path = ShardNameTemplateHelper.getDefaultWorkFile(this, context); - return path.getFileSystem(context.getConfiguration()).create(path); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java deleted file mode 100644 index ab1263b..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.io.hadoop; - -import java.io.IOException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; - -/** - * Templated sequence file output format. - */ -public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V> - implements ShardNameTemplateAware { - - @Override - public void checkOutputSpecs(JobContext job) { - // don't fail if the output already exists - } - - @Override - public Path getDefaultWorkFile(TaskAttemptContext context, - String extension) throws IOException { - // note that the passed-in extension is ignored since it comes from the template - return ShardNameTemplateHelper.getDefaultWorkFile(this, context); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java deleted file mode 100644 index 5a6e9a9..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.io.hadoop; - -import java.io.IOException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; - -/** - * Templates text output format. - */ -public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V> - implements ShardNameTemplateAware { - - @Override - public void checkOutputSpecs(JobContext job) { - // don't fail if the output already exists - } - - @Override - public Path getDefaultWorkFile(TaskAttemptContext context, - String extension) throws IOException { - // note that the passed-in extension is ignored since it comes from the template - return ShardNameTemplateHelper.getDefaultWorkFile(this, context); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java deleted file mode 100644 index 70cd0f3..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Spark-specific transforms for reading from and writing to Hadoop file systems (HDFS). - */ -package org.apache.beam.runners.spark.io.hadoop; http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index d88ef7e..6290bba 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -20,10 +20,6 @@ package org.apache.beam.runners.spark.translation; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable; import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers; @@ -31,22 +27,14 @@ import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroJob; -import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.SourceRDD; -import org.apache.beam.runners.spark.io.hadoop.HadoopIO; -import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper; -import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat; -import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.util.SideInputBroadcast; @@ -54,9 +42,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.Create; @@ -78,18 +64,11 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; -import scala.Tuple2; /** @@ -427,119 +406,6 @@ public final class TransformTranslator { }; } - - private static <T> TransformEvaluator<TextIO.Read.Bound> readText() { - return new TransformEvaluator<TextIO.Read.Bound>() { - @Override - public void evaluate(TextIO.Read.Bound transform, EvaluationContext context) { - String pattern = transform.getFilepattern(); - JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern) - .map(WindowingHelpers.<String>windowFunction()); - context.putDataset(transform, new BoundedDataset<>(rdd)); - } - - @Override - public String toNativeString() { - return "sparkContext.textFile(...)"; - } - }; - } - - private static <T> TransformEvaluator<TextIO.Write.Bound> writeText() { - return new TransformEvaluator<TextIO.Write.Bound>() { - @Override - public void evaluate(TextIO.Write.Bound transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaPairRDD<T, Void> last = - ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD() - .map(WindowingHelpers.<T>unwindowFunction()) - .mapToPair(new PairFunction<T, T, - Void>() { - @Override - public Tuple2<T, Void> call(T t) throws Exception { - return new Tuple2<>(t, null); - } - }); - ShardTemplateInformation shardTemplateInfo = - new ShardTemplateInformation(transform.getNumShards(), - transform.getShardTemplate(), transform.getFilenamePrefix(), - transform.getFilenameSuffix()); - writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class, - NullWritable.class, TemplatedTextOutputFormat.class); - } - - @Override - public String toNativeString() { - return "saveAsNewAPIHadoopFile(...)"; - } - }; - } - - private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() { - return new TransformEvaluator<AvroIO.Read.Bound<T>>() { - @Override - public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) { - String pattern = transform.getFilepattern(); - JavaSparkContext jsc = context.getSparkContext(); - @SuppressWarnings("unchecked") - JavaRDD<AvroKey<T>> avroFile = (JavaRDD<AvroKey<T>>) (JavaRDD<?>) - jsc.newAPIHadoopFile(pattern, - AvroKeyInputFormat.class, - AvroKey.class, NullWritable.class, - new Configuration()).keys(); - JavaRDD<WindowedValue<T>> rdd = avroFile.map( - new Function<AvroKey<T>, T>() { - @Override - public T call(AvroKey<T> key) { - return key.datum(); - } - }).map(WindowingHelpers.<T>windowFunction()); - context.putDataset(transform, new BoundedDataset<>(rdd)); - } - - @Override - public String toNativeString() { - return "sparkContext.newAPIHadoopFile(...)"; - } - }; - } - - private static <T> TransformEvaluator<AvroIO.Write.Bound<T>> writeAvro() { - return new TransformEvaluator<AvroIO.Write.Bound<T>>() { - @Override - public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) { - Job job; - try { - job = Job.getInstance(); - } catch (IOException e) { - throw new IllegalStateException(e); - } - AvroJob.setOutputKeySchema(job, transform.getSchema()); - @SuppressWarnings("unchecked") - JavaPairRDD<AvroKey<T>, NullWritable> last = - ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD() - .map(WindowingHelpers.<T>unwindowFunction()) - .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() { - @Override - public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception { - return new Tuple2<>(new AvroKey<>(t), NullWritable.get()); - } - }); - ShardTemplateInformation shardTemplateInfo = - new ShardTemplateInformation(transform.getNumShards(), - transform.getShardTemplate(), transform.getFilenamePrefix(), - transform.getFilenameSuffix()); - writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo, - AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class); - } - - @Override - public String toNativeString() { - return "mapToPair(<objectToAvroKeyFn>).saveAsNewAPIHadoopFile(...)"; - } - }; - } - private static <T> TransformEvaluator<Read.Bounded<T>> readBounded() { return new TransformEvaluator<Read.Bounded<T>>() { @Override @@ -561,121 +427,6 @@ public final class TransformTranslator { }; } - private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() { - return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() { - @Override - public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) { - String pattern = transform.getFilepattern(); - JavaSparkContext jsc = context.getSparkContext(); - @SuppressWarnings("unchecked") - JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern, - transform.getFormatClass(), - transform.getKeyClass(), transform.getValueClass(), - new Configuration()); - JavaRDD<WindowedValue<KV<K, V>>> rdd = - file.map(new Function<Tuple2<K, V>, KV<K, V>>() { - @Override - public KV<K, V> call(Tuple2<K, V> t2) throws Exception { - return KV.of(t2._1(), t2._2()); - } - }).map(WindowingHelpers.<KV<K, V>>windowFunction()); - context.putDataset(transform, new BoundedDataset<>(rdd)); - } - - @Override - public String toNativeString() { - return "sparkContext.newAPIHadoopFile(...)"; - } - }; - } - - private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() { - return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>() { - @Override - public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaPairRDD<K, V> last = ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)) - .getRDD() - .map(WindowingHelpers.<KV<K, V>>unwindowFunction()) - .mapToPair(new PairFunction<KV<K, V>, K, V>() { - @Override - public Tuple2<K, V> call(KV<K, V> t) throws Exception { - return new Tuple2<>(t.getKey(), t.getValue()); - } - }); - ShardTemplateInformation shardTemplateInfo = - new ShardTemplateInformation(transform.getNumShards(), - transform.getShardTemplate(), transform.getFilenamePrefix(), - transform.getFilenameSuffix()); - Configuration conf = new Configuration(); - for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) { - conf.set(e.getKey(), e.getValue()); - } - writeHadoopFile(last, conf, shardTemplateInfo, - transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass()); - } - - @Override - public String toNativeString() { - return "saveAsNewAPIHadoopFile(...)"; - } - }; - } - - private static final class ShardTemplateInformation { - private final int numShards; - private final String shardTemplate; - private final String filenamePrefix; - private final String filenameSuffix; - - private ShardTemplateInformation(int numShards, String shardTemplate, String - filenamePrefix, String filenameSuffix) { - this.numShards = numShards; - this.shardTemplate = shardTemplate; - this.filenamePrefix = filenamePrefix; - this.filenameSuffix = filenameSuffix; - } - - int getNumShards() { - return numShards; - } - - String getShardTemplate() { - return shardTemplate; - } - - String getFilenamePrefix() { - return filenamePrefix; - } - - String getFilenameSuffix() { - return filenameSuffix; - } - } - - private static <K, V> void writeHadoopFile(JavaPairRDD<K, V> rdd, Configuration conf, - ShardTemplateInformation shardTemplateInfo, Class<?> keyClass, Class<?> valueClass, - Class<? extends FileOutputFormat> formatClass) { - int numShards = shardTemplateInfo.getNumShards(); - String shardTemplate = shardTemplateInfo.getShardTemplate(); - String filenamePrefix = shardTemplateInfo.getFilenamePrefix(); - String filenameSuffix = shardTemplateInfo.getFilenameSuffix(); - if (numShards != 0) { - // number of shards was set explicitly, so repartition - rdd = rdd.repartition(numShards); - } - int actualNumShards = rdd.partitions().size(); - String template = replaceShardCount(shardTemplate, actualNumShards); - String outputDir = getOutputDirectory(filenamePrefix, template); - String filePrefix = getOutputFilePrefix(filenamePrefix, template); - String fileTemplate = getOutputFileTemplate(filenamePrefix, template); - - conf.set(ShardNameTemplateHelper.OUTPUT_FILE_PREFIX, filePrefix); - conf.set(ShardNameTemplateHelper.OUTPUT_FILE_TEMPLATE, fileTemplate); - conf.set(ShardNameTemplateHelper.OUTPUT_FILE_SUFFIX, filenameSuffix); - rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf); - } - private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>> window() { return new TransformEvaluator<Window.Assign<T>>() { @Override @@ -847,8 +598,6 @@ public final class TransformTranslator { static { EVALUATORS.put(Read.Bounded.class, readBounded()); - EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop()); - EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop()); EVALUATORS.put(ParDo.MultiOutput.class, parDo()); EVALUATORS.put(GroupByKey.class, groupByKey()); EVALUATORS.put(Combine.GroupedValues.class, combineGrouped()); http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java deleted file mode 100644 index 48b5433..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.io.hadoop; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.IOException; -import org.apache.beam.runners.spark.PipelineRule; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.io.hadoop.WritableCoder; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -/** - * Pipeline on the Hadoop file format test. - */ -public class HadoopFileFormatPipelineTest { - - private File inputFile; - private File outputFile; - - @Rule - public final PipelineRule pipelineRule = PipelineRule.batch(); - - @Rule - public final TemporaryFolder tmpDir = new TemporaryFolder(); - - @Before - public void setUp() throws IOException { - inputFile = tmpDir.newFile("test.seq"); - outputFile = tmpDir.newFolder("out"); - outputFile.delete(); - } - - @Test - public void testSequenceFile() throws Exception { - populateFile(); - - Pipeline p = pipelineRule.createPipeline(); - @SuppressWarnings("unchecked") - Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass = - (Class<? extends FileInputFormat<IntWritable, Text>>) - (Class<?>) SequenceFileInputFormat.class; - HadoopIO.Read.Bound<IntWritable, Text> read = - HadoopIO.Read.from(inputFile.getAbsolutePath(), - inputFormatClass, - IntWritable.class, - Text.class); - PCollection<KV<IntWritable, Text>> input = p.apply(read) - .setCoder(KvCoder.of(WritableCoder.of(IntWritable.class), WritableCoder.of(Text.class))); - @SuppressWarnings("unchecked") - Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass = - (Class<? extends FileOutputFormat<IntWritable, Text>>) - (Class<?>) TemplatedSequenceFileOutputFormat.class; - @SuppressWarnings("unchecked") - HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(), - outputFormatClass, IntWritable.class, Text.class); - input.apply(write.withoutSharding()); - p.run().waitUntilFinish(); - - IntWritable key = new IntWritable(); - Text value = new Text(); - try (Reader reader = new Reader(new Configuration(), - Reader.file(new Path(outputFile.toURI())))) { - int i = 0; - while (reader.next(key, value)) { - assertEquals(i, key.get()); - assertEquals("value-" + i, value.toString()); - i++; - } - } - } - - private void populateFile() throws IOException { - IntWritable key = new IntWritable(); - Text value = new Text(); - try (Writer writer = SequenceFile.createWriter( - new Configuration(), - Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), - Writer.file(new Path(this.inputFile.toURI())))) { - for (int i = 0; i < 5; i++) { - key.set(i); - value.set("value-" + i); - writer.append(key, value); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java deleted file mode 100644 index 1f2cf63..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.io.hadoop; - -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber; -import static org.junit.Assert.assertEquals; - -import org.junit.Test; - -/** - * Test on the {@link ShardNameBuilder}. - */ -public class ShardNameBuilderTest { - - @Test - public void testReplaceShardCount() { - assertEquals("", replaceShardCount("", 6)); - assertEquals("-S-of-6", replaceShardCount("-S-of-N", 6)); - assertEquals("-SS-of-06", replaceShardCount("-SS-of-NN", 6)); - assertEquals("-S-of-60", replaceShardCount("-S-of-N", 60)); - assertEquals("-SS-of-60", replaceShardCount("-SS-of-NN", 60)); - assertEquals("/part-SSSSS", replaceShardCount("/part-SSSSS", 6)); - } - - @Test - public void testReplaceShardNumber() { - assertEquals("", replaceShardNumber("", 5)); - assertEquals("-5-of-6", replaceShardNumber("-S-of-6", 5)); - assertEquals("-05-of-06", replaceShardNumber("-SS-of-06", 5)); - assertEquals("-59-of-60", replaceShardNumber("-S-of-60", 59)); - assertEquals("-59-of-60", replaceShardNumber("-SS-of-60", 59)); - assertEquals("/part-00005", replaceShardNumber("/part-SSSSS", 5)); - } - - @Test - public void testGetOutputDirectory() { - assertEquals("./", getOutputDirectory("foo", "-S-of-N")); - assertEquals("foo", getOutputDirectory("foo/bar", "-S-of-N")); - assertEquals("/foo", getOutputDirectory("/foo/bar", "-S-of-N")); - assertEquals("hdfs://foo/", getOutputDirectory("hdfs://foo/bar", "-S-of-N")); - assertEquals("foo/bar", getOutputDirectory("foo/bar", "/part-SSSSS")); - assertEquals("/foo/bar", getOutputDirectory("/foo/bar", "/part-SSSSS")); - assertEquals("hdfs://foo/bar", getOutputDirectory("hdfs://foo/bar", "/part-SSSSS")); - } - - @Test - public void testGetOutputFilePrefix() { - assertEquals("foo", getOutputFilePrefix("foo", "-S-of-N")); - assertEquals("bar", getOutputFilePrefix("foo/bar", "-S-of-N")); - assertEquals("bar", getOutputFilePrefix("/foo/bar", "-S-of-N")); - assertEquals("bar", getOutputFilePrefix("hdfs://foo/bar", "-S-of-N")); - assertEquals("", getOutputFilePrefix("foo/bar", "/part-SSSSS")); - assertEquals("", getOutputFilePrefix("/foo/bar", "/part-SSSSS")); - assertEquals("", getOutputFilePrefix("hdfs://foo/bar", "/part-SSSSS")); - } - - @Test - public void testGetOutputFileTemplate() { - assertEquals("-S-of-N", getOutputFileTemplate("foo", "-S-of-N")); - assertEquals("-S-of-N", getOutputFileTemplate("foo/bar", "-S-of-N")); - assertEquals("-S-of-N", getOutputFileTemplate("/foo/bar", "-S-of-N")); - assertEquals("-S-of-N", getOutputFileTemplate("hdfs://foo/bar", "-S-of-N")); - assertEquals("part-SSSSS", getOutputFileTemplate("foo/bar", "/part-SSSSS")); - assertEquals("part-SSSSS", getOutputFileTemplate("/foo/bar", "/part-SSSSS")); - assertEquals("part-SSSSS", getOutputFileTemplate("hdfs://foo/bar", "/part-SSSSS")); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/test/resources/test_text.txt ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/resources/test_text.txt b/runners/spark/src/test/resources/test_text.txt deleted file mode 100644 index 6a14a1b..0000000 --- a/runners/spark/src/test/resources/test_text.txt +++ /dev/null @@ -1,2 +0,0 @@ -test line 1 -test line 2 \ No newline at end of file