[BEAM-3060] Adds TextIOIT for DirectRunner and local filesystem This is one of multiple commits to resolve the 3060 issue. Currently only local filesystem, relatively small datasets and DirectRunner are supported. More runners, filesystems and larger dataset testing ability (of gigabytes size) will be added soon in further commits.
See: https://docs.google.com/document/d/1dA-5s6OHiP_cz-NRAbwapoKF5MEC1wKps4A5tFbIPKE/edit# Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb4b6d3a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb4b6d3a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb4b6d3a Branch: refs/heads/tez-runner Commit: fb4b6d3a6a6ece1d8d9f4103708fde5595c12ad4 Parents: 51c938d Author: Åukasz Gajowy <lukasz.gaj...@polidea.com> Authored: Tue Oct 31 10:25:22 2017 +0100 Committer: chamik...@google.com <chamik...@google.com> Committed: Fri Nov 10 17:15:07 2017 -0800 ---------------------------------------------------------------------- .../sdk/io/common/IOTestPipelineOptions.java | 12 ++ sdks/java/io/file-based-io-tests/pom.xml | 72 +++++++++ .../org/apache/beam/sdk/io/text/TextIOIT.java | 146 +++++++++++++++++++ sdks/java/io/pom.xml | 1 + 4 files changed, 231 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fb4b6d3a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index 256c94d..91b3aa6 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -88,4 +88,16 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { Integer getCassandraPort(); void setCassandraPort(Integer port); + /* Options for test pipeline for file-based I/O in 'sdks/java/io/file-based-io-tests/'. */ + @Description("Number records that will be written and read by the test") + @Default.Long(100000) + Long getNumberOfRecords(); + + void setNumberOfRecords(Long count); + + @Description("Destination prefix for files generated by the test") + @Default.String("TEXTIOIT") + String getFilenamePrefix(); + + void setFilenamePrefix(String prefix); } http://git-wip-us.apache.org/repos/asf/beam/blob/fb4b6d3a/sdks/java/io/file-based-io-tests/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/file-based-io-tests/pom.xml b/sdks/java/io/file-based-io-tests/pom.xml new file mode 100644 index 0000000..ae7527c --- /dev/null +++ b/sdks/java/io/file-based-io-tests/pom.xml @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>2.3.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-file-based-io-tests</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: File-based-io-tests</name> + <description>Integration tests for reading/writing using file-based sources/sinks.</description> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/fb4b6d3a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java new file mode 100644 index 0000000..ecab1d8 --- /dev/null +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.text; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.text.ParseException; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Map; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * An integration test for {@link org.apache.beam.sdk.io.TextIO}. + * + * <p>Run this test using the command below. Pass in connection information via PipelineOptions: + * <pre> + * mvn -e -Pio-it verify -pl sdks/java/io/text -DintegrationTestPipelineOptions='[ + * "--numberOfRecords=100000", + * "--filenamePrefix=TEXTIOIT" + * ]' + * </pre> + * */ +@RunWith(JUnit4.class) +public class TextIOIT { + + private static String filenamePrefix; + private static Long numberOfTextLines; + + @Rule + public TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void setup() throws ParseException { + PipelineOptionsFactory.register(IOTestPipelineOptions.class); + IOTestPipelineOptions options = TestPipeline.testingPipelineOptions() + .as(IOTestPipelineOptions.class); + + numberOfTextLines = options.getNumberOfRecords(); + filenamePrefix = appendTimestamp(options.getFilenamePrefix()); + } + + private static String appendTimestamp(String filenamePrefix) { + return String.format("%s_%s", filenamePrefix, new Date().getTime()); + } + + @Test + public void writeThenReadAll() { + PCollection<String> testFilenames = pipeline + .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) + .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) + .apply("Write content to files", TextIO.write().to(filenamePrefix).withOutputFilenames()) + .getPerDestinationOutputFilenames().apply(Values.<String>create()); + + PCollection<String> consolidatedHashcode = testFilenames + .apply("Read all files", TextIO.readAll()) + .apply("Calculate hashcode", Combine.globally(new HashingFn())); + + String expectedHash = getExpectedHashForLineCount(numberOfTextLines); + PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); + + testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn()) + .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton()))); + + pipeline.run().waitUntilFinish(); + } + + private static String getExpectedHashForLineCount(Long lineCount) { + Map<Long, String> expectedHashes = ImmutableMap.of( + 100_000L, "4c8bb3b99dcc59459b20fefba400d446", + 1_000_000L, "9796db06e7a7960f974d5a91164afff1" + ); + + String hash = expectedHashes.get(lineCount); + if (hash == null) { + throw new UnsupportedOperationException( + String.format("No hash for that line count: %s", lineCount)); + } + return hash; + } + + private static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(String.format("IO IT Test line of text. Line seed: %s", c.element())); + } + } + + private static class DeleteFileFn extends DoFn<String, Void> { + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + MatchResult match = Iterables + .getOnlyElement(FileSystems.match(Collections.singletonList(c.element()))); + FileSystems.delete(toResourceIds(match)); + } + + private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException { + return FluentIterable.from(match.metadata()) + .transform(new Function<MatchResult.Metadata, ResourceId>() { + @Override + public ResourceId apply(MatchResult.Metadata metadata) { + return metadata.resourceId(); + } + }).toList(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/fb4b6d3a/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 99936a2..0f8bc78 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -45,6 +45,7 @@ <module>common</module> <module>elasticsearch</module> <module>elasticsearch-tests</module> + <module>file-based-io-tests</module> <module>google-cloud-platform</module> <module>hadoop-common</module> <module>hadoop-file-system</module>