Repository: beam Updated Branches: refs/heads/master 011e2796d -> 84682109b
Adds TextIO.readAll(), implemented rather naively Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd216f79 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd216f79 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd216f79 Branch: refs/heads/master Commit: cd216f796bebf78101dce7ab6387f3db9b839fc7 Parents: 011e279 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Jun 23 18:02:10 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Jul 11 16:06:41 2017 -0700 ---------------------------------------------------------------------- ...ndedSplittableProcessElementInvokerTest.java | 2 +- .../core/SplittableParDoProcessFnTest.java | 2 +- .../DataflowPipelineTranslatorTest.java | 2 +- .../apache/beam/sdk/io/CompressedSource.java | 40 ++-- .../apache/beam/sdk/io/OffsetBasedSource.java | 22 +- .../java/org/apache/beam/sdk/io/TextIO.java | 230 +++++++++++++++++-- .../apache/beam/sdk/io/range/OffsetRange.java | 101 ++++++++ .../beam/sdk/io/range/OffsetRangeTracker.java | 3 + .../transforms/splittabledofn/OffsetRange.java | 77 ------- .../splittabledofn/OffsetRangeTracker.java | 1 + .../java/org/apache/beam/sdk/io/TextIOTest.java | 62 +++-- .../beam/sdk/transforms/SplittableDoFnTest.java | 2 +- .../splittabledofn/OffsetRangeTrackerTest.java | 1 + 13 files changed, 387 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index a2f6acc..b80a632 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -25,10 +25,10 @@ import static org.junit.Assert.assertThat; import java.util.Collection; import java.util.concurrent.Executors; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index 9543de8..1cd1275 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -39,11 +39,11 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; -import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 948af1c..43b2788 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -84,6 +84,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -98,7 +99,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 6ab8dec..4baac36 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -96,12 +96,6 @@ public class CompressedSource<T> extends FileBasedSource<T> { */ ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel) throws IOException; - - /** - * Given a file name, returns true if the file name matches any supported compression - * scheme. - */ - boolean isCompressed(String fileName); } /** @@ -242,6 +236,16 @@ public class CompressedSource<T> extends FileBasedSource<T> { @Override public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) throws IOException; + + /** Returns whether the file's extension matches of one of the known compression formats. */ + public static boolean isCompressed(String filename) { + for (CompressionMode type : CompressionMode.values()) { + if (type.matches(filename)) { + return true; + } + } + return false; + } } /** @@ -273,16 +277,6 @@ public class CompressedSource<T> extends FileBasedSource<T> { ReadableByteChannel.class.getSimpleName(), ReadableByteChannel.class.getSimpleName())); } - - @Override - public boolean isCompressed(String fileName) { - for (CompressionMode type : CompressionMode.values()) { - if (type.matches(fileName)) { - return true; - } - } - return false; - } } private final FileBasedSource<T> sourceDelegate; @@ -366,13 +360,9 @@ public class CompressedSource<T> extends FileBasedSource<T> { */ @Override protected final boolean isSplittable() throws Exception { - if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) { - FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory = - (FileNameBasedDecompressingChannelFactory) channelFactory; - return !fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec()) - && sourceDelegate.isSplittable(); - } - return false; + return channelFactory instanceof FileNameBasedDecompressingChannelFactory + && !CompressionMode.isCompressed(getFileOrPatternSpec()) + && sourceDelegate.isSplittable(); } /** @@ -386,9 +376,7 @@ public class CompressedSource<T> extends FileBasedSource<T> { @Override protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) { if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) { - FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory = - (FileNameBasedDecompressingChannelFactory) channelFactory; - if (!fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())) { + if (!CompressionMode.isCompressed(getFileOrPatternSpec())) { return sourceDelegate.createSingleFileReader(options); } } http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index 05f0d97..c3687a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.io.range.OffsetRangeTracker; import org.apache.beam.sdk.io.range.RangeTracker; import org.apache.beam.sdk.options.PipelineOptions; @@ -110,8 +111,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { @Override public List<? extends OffsetBasedSource<T>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to - // make sure that we do not end up with a too small bundle at the end. If the desired bundle + // Split the range into bundles based on the desiredBundleSizeBytes. If the desired bundle // size is smaller than the minBundleSize of the source then minBundleSize will be used instead. long desiredBundleSizeOffsetUnits = Math.max( @@ -119,20 +119,10 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { minBundleSize); List<OffsetBasedSource<T>> subSources = new ArrayList<>(); - long start = startOffset; - long maxEnd = Math.min(endOffset, getMaxEndOffset(options)); - - while (start < maxEnd) { - long end = start + desiredBundleSizeOffsetUnits; - end = Math.min(end, maxEnd); - // Avoid having a too small bundle at the end and ensure that we respect minBundleSize. - long remaining = maxEnd - end; - if ((remaining < desiredBundleSizeOffsetUnits / 4) || (remaining < minBundleSize)) { - end = maxEnd; - } - subSources.add(createSourceForSubrange(start, end)); - - start = end; + for (OffsetRange range : + new OffsetRange(startOffset, Math.min(endOffset, getMaxEndOffset(options))) + .split(desiredBundleSizeOffsetUnits, minBundleSize)) { + subSources.add(createSourceForSubrange(range.getFrom(), range.getTo())); } return subSources; } http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 5241589..78340f3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -23,25 +23,37 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.CompressedSource.CompressionMode; import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -51,13 +63,14 @@ import org.apache.beam.sdk.values.PDone; * * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the - * file(s) to be read. + * file(s) to be read. Alternatively, if the filenames to be read are themselves in a + * {@link PCollection}, apply {@link TextIO#readAll()}. * * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', * or '\r\n'). * - * <p>Example: + * <p>Example 1: reading a file or filepattern. * * <pre>{@code * Pipeline p = ...; @@ -66,6 +79,19 @@ import org.apache.beam.sdk.values.PDone; * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt")); * }</pre> * + * <p>Example 2: reading a PCollection of filenames. + * + * <pre>{@code + * Pipeline p = ...; + * + * // E.g. the filenames might be computed from other data in the pipeline, or + * // read from a data source. + * PCollection<String> filenames = ...; + * + * // Read all files in the collection. + * PCollection<String> lines = filenames.apply(TextIO.readAll()); + * }</pre> + * * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write. * @@ -132,6 +158,26 @@ public class TextIO { } /** + * A {@link PTransform} that works like {@link #read}, but reads each file in a {@link + * PCollection} of filepatterns. + * + * <p>Can be applied to both bounded and unbounded {@link PCollection PCollections}, so this is + * suitable for reading a {@link PCollection} of filepatterns arriving as a stream. However, every + * filepattern is expanded once at the moment it is processed, rather than watched for new files + * matching the filepattern to appear. Likewise, every file is read once, rather than watched for + * new entries. + */ + public static ReadAll readAll() { + return new AutoValue_TextIO_ReadAll.Builder() + .setCompressionType(CompressionType.AUTO) + // 64MB is a reasonable value that allows to amortize the cost of opening files, + // but is not so large as to exhaust a typical runner's maximum amount of output per + // ProcessElement call. + .setDesiredBundleSizeBytes(64 * 1024 * 1024L) + .build(); + } + + /** * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files * matching a sharding pattern), with each element of the input collection encoded into its own * line. @@ -228,29 +274,34 @@ public class TextIO { // Helper to create a source specific to the requested compression type. protected FileBasedSource<String> getSource() { - switch (getCompressionType()) { + return wrapWithCompression(new TextSource(getFilepattern()), getCompressionType()); + } + + private static FileBasedSource<String> wrapWithCompression( + FileBasedSource<String> source, CompressionType compressionType) { + switch (compressionType) { case UNCOMPRESSED: - return new TextSource(getFilepattern()); + return source; case AUTO: - return CompressedSource.from(new TextSource(getFilepattern())); + return CompressedSource.from(source); case BZIP2: return - CompressedSource.from(new TextSource(getFilepattern())) - .withDecompression(CompressedSource.CompressionMode.BZIP2); + CompressedSource.from(source) + .withDecompression(CompressionMode.BZIP2); case GZIP: return - CompressedSource.from(new TextSource(getFilepattern())) - .withDecompression(CompressedSource.CompressionMode.GZIP); + CompressedSource.from(source) + .withDecompression(CompressionMode.GZIP); case ZIP: return - CompressedSource.from(new TextSource(getFilepattern())) - .withDecompression(CompressedSource.CompressionMode.ZIP); + CompressedSource.from(source) + .withDecompression(CompressionMode.ZIP); case DEFLATE: return - CompressedSource.from(new TextSource(getFilepattern())) - .withDecompression(CompressedSource.CompressionMode.DEFLATE); + CompressedSource.from(source) + .withDecompression(CompressionMode.DEFLATE); default: - throw new IllegalArgumentException("Unknown compression type: " + getFilepattern()); + throw new IllegalArgumentException("Unknown compression type: " + compressionType); } } @@ -273,7 +324,156 @@ public class TextIO { } } - // /////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////////////////////////////////////////////// + + /** Implementation of {@link #readAll}. */ + @AutoValue + public abstract static class ReadAll + extends PTransform<PCollection<String>, PCollection<String>> { + abstract CompressionType getCompressionType(); + abstract long getDesiredBundleSizeBytes(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setCompressionType(CompressionType compressionType); + abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes); + + abstract ReadAll build(); + } + + /** Same as {@link Read#withCompressionType(CompressionType)}. */ + public ReadAll withCompressionType(CompressionType compressionType) { + return toBuilder().setCompressionType(compressionType).build(); + } + + @VisibleForTesting + ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { + return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); + } + + @Override + public PCollection<String> expand(PCollection<String> input) { + return input + .apply("Expand glob", ParDo.of(new ExpandGlobFn())) + .apply( + "Split into ranges", + ParDo.of(new SplitIntoRangesFn(getCompressionType(), getDesiredBundleSizeBytes()))) + .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>()) + .apply("Read", ParDo.of(new ReadTextFn(this))); + } + + private static class ReshuffleWithUniqueKey<T> + extends PTransform<PCollection<T>, PCollection<T>> { + @Override + public PCollection<T> expand(PCollection<T> input) { + return input + .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>())) + .apply("Reshuffle", Reshuffle.<Integer, T>of()) + .apply("Values", Values.<T>create()); + } + } + + private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> { + private int index; + + @Setup + public void setup() { + this.index = ThreadLocalRandom.current().nextInt(); + } + + @ProcessElement + public void process(ProcessContext c) { + c.output(KV.of(++index, c.element())); + } + } + + private static class ExpandGlobFn extends DoFn<String, Metadata> { + @ProcessElement + public void process(ProcessContext c) throws Exception { + MatchResult match = FileSystems.match(c.element()); + checkArgument( + match.status().equals(Status.OK), + "Failed to match filepattern %s: %s", + c.element(), + match.status()); + for (Metadata metadata : match.metadata()) { + c.output(metadata); + } + } + } + + private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> { + private final CompressionType compressionType; + private final long desiredBundleSize; + + private SplitIntoRangesFn(CompressionType compressionType, long desiredBundleSize) { + this.compressionType = compressionType; + this.desiredBundleSize = desiredBundleSize; + } + + @ProcessElement + public void process(ProcessContext c) { + Metadata metadata = c.element(); + final boolean isSplittable = isSplittable(metadata, compressionType); + if (!isSplittable) { + c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes()))); + return; + } + for (OffsetRange range : + new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 0)) { + c.output(KV.of(metadata, range)); + } + } + + static boolean isSplittable(Metadata metadata, CompressionType compressionType) { + if (!metadata.isReadSeekEfficient()) { + return false; + } + switch (compressionType) { + case AUTO: + return !CompressionMode.isCompressed(metadata.resourceId().toString()); + case UNCOMPRESSED: + return true; + case GZIP: + case BZIP2: + case ZIP: + case DEFLATE: + return false; + default: + throw new UnsupportedOperationException("Unknown compression type: " + compressionType); + } + } + } + + private static class ReadTextFn extends DoFn<KV<Metadata, OffsetRange>, String> { + private final TextIO.ReadAll spec; + + private ReadTextFn(ReadAll spec) { + this.spec = spec; + } + + @ProcessElement + public void process(ProcessContext c) throws IOException { + Metadata metadata = c.element().getKey(); + OffsetRange range = c.element().getValue(); + FileBasedSource<String> source = + TextIO.Read.wrapWithCompression( + new TextSource(StaticValueProvider.of(metadata.toString())), + spec.getCompressionType()); + BoundedSource.BoundedReader<String> reader = + source + .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo()) + .createReader(c.getPipelineOptions()); + for (boolean more = reader.start(); more; more = reader.advance()) { + c.output(reader.getCurrent()); + } + } + } + } + + ///////////////////////////////////////////////////////////////////////////// /** Implementation of {@link #write}. */ @AutoValue http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java new file mode 100644 index 0000000..d3bff37 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.range; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; + +/** A restriction represented by a range of integers [from, to). */ +public class OffsetRange + implements Serializable, + HasDefaultTracker< + OffsetRange, org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker> { + private final long from; + private final long to; + + public OffsetRange(long from, long to) { + checkArgument(from <= to, "Malformed range [%s, %s)", from, to); + this.from = from; + this.to = to; + } + + public long getFrom() { + return from; + } + + public long getTo() { + return to; + } + + @Override + public org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker newTracker() { + return new org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker(this); + } + + @Override + public String toString() { + return "[" + from + ", " + to + ')'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + OffsetRange that = (OffsetRange) o; + + if (from != that.from) { + return false; + } + return to == that.to; + } + + @Override + public int hashCode() { + int result = (int) (from ^ (from >>> 32)); + result = 31 * result + (int) (to ^ (to >>> 32)); + return result; + } + + public List<OffsetRange> split(long desiredNumOffsetsPerSplit, long minNumOffsetPerSplit) { + List<OffsetRange> res = new ArrayList<>(); + long start = getFrom(); + long maxEnd = getTo(); + + while (start < maxEnd) { + long end = start + desiredNumOffsetsPerSplit; + end = Math.min(end, maxEnd); + // Avoid having a too small range at the end and ensure that we respect minNumOffsetPerSplit. + long remaining = maxEnd - end; + if ((remaining < desiredNumOffsetsPerSplit / 4) || (remaining < minNumOffsetPerSplit)) { + end = maxEnd; + } + res.add(new OffsetRange(start, end)); + start = end; + } + return res; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java index 51e2b1a..8f0083e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java @@ -26,6 +26,9 @@ import org.slf4j.LoggerFactory; /** * A {@link RangeTracker} for non-negative positions of type {@code long}. + * + * <p>Not to be confused with {@link + * org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker}. */ public class OffsetRangeTracker implements RangeTracker<Long> { private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class); http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java deleted file mode 100644 index 104f5f2..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.splittabledofn; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.Serializable; - -/** A restriction represented by a range of integers [from, to). */ -public class OffsetRange - implements Serializable, HasDefaultTracker<OffsetRange, OffsetRangeTracker> { - private final long from; - private final long to; - - public OffsetRange(long from, long to) { - checkArgument(from <= to, "Malformed range [%s, %s)", from, to); - this.from = from; - this.to = to; - } - - public long getFrom() { - return from; - } - - public long getTo() { - return to; - } - - @Override - public OffsetRangeTracker newTracker() { - return new OffsetRangeTracker(this); - } - - @Override - public String toString() { - return "[" + from + ", " + to + ')'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - OffsetRange that = (OffsetRange) o; - - if (from != that.from) { - return false; - } - return to == that.to; - } - - @Override - public int hashCode() { - int result = (int) (from ^ (from >>> 32)); - result = 31 * result + (int) (to ^ (to >>> 32)); - return result; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java index 0271a0d..62c10a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; /** http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 8797ff7..a6be4fb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -120,10 +120,10 @@ import org.junit.runners.JUnit4; public class TextIOTest { private static final String MY_HEADER = "myHeader"; private static final String MY_FOOTER = "myFooter"; - private static final String[] EMPTY = new String[] {}; - private static final String[] TINY = - new String[] {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; - private static final String[] LARGE = makeLines(1000); + private static final List<String> EMPTY = Collections.emptyList(); + private static final List<String> TINY = + Arrays.asList("Irritable eagle", "Optimistic jay", "Fanciful hawk"); + private static final List<String> LARGE = makeLines(1000); private static Path tempFolder; private static File emptyTxt; @@ -148,7 +148,7 @@ public class TextIOTest { @Rule public ExpectedException expectedException = ExpectedException.none(); - private static File writeToFile(String[] lines, String filename, CompressionType compression) + private static File writeToFile(List<String> lines, String filename, CompressionType compression) throws IOException { File file = tempFolder.resolve(filename).toFile(); OutputStream output = new FileOutputStream(file); @@ -791,7 +791,7 @@ public class TextIOTest { * Helper that writes the given lines (adding a newline in between) to a stream, then closes the * stream. */ - private static void writeToStreamAndClose(String[] lines, OutputStream outputStream) { + private static void writeToStreamAndClose(List<String> lines, OutputStream outputStream) { try (PrintStream writer = new PrintStream(outputStream)) { for (String line : lines) { writer.println(line); @@ -800,27 +800,33 @@ public class TextIOTest { } /** - * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType) + * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType) and + * TextIO.readAll().withCompressionType(compressionType) applied to the single filename, * and asserts that the results match the given expected output. */ private void assertReadingCompressedFileMatchesExpected( - File file, CompressionType compressionType, String[] expected) { - - TextIO.Read read = - TextIO.read().from(file.getPath()).withCompressionType(compressionType); - PCollection<String> output = p.apply("Read_" + file + "_" + compressionType.toString(), read); - - PAssert.that(output).containsInAnyOrder(expected); + File file, CompressionType compressionType, List<String> expected) { + + TextIO.Read read = TextIO.read().from(file.getPath()).withCompressionType(compressionType); + PAssert.that(p.apply("Read_" + file + "_" + compressionType.toString(), read)) + .containsInAnyOrder(expected); + + TextIO.ReadAll readAll = + TextIO.readAll().withCompressionType(compressionType).withDesiredBundleSizeBytes(10); + PAssert.that( + p.apply("Create_" + file, Create.of(file.getPath())) + .apply("Read_" + compressionType.toString(), readAll)) + .containsInAnyOrder(expected); p.run(); } /** * Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). */ - private static String[] makeLines(int n) { - String[] ret = new String[n]; + private static List<String> makeLines(int n) { + List<String> ret = new ArrayList<>(); for (int i = 0; i < n; ++i) { - ret[i] = "word" + i; + ret.add("word" + i); } return ret; } @@ -1004,7 +1010,7 @@ public class TextIOTest { String filename = createZipFile(expected, "multiple entries", entry0, entry1, entry2); assertReadingCompressedFileMatchesExpected( - new File(filename), CompressionType.ZIP, expected.toArray(new String[]{})); + new File(filename), CompressionType.ZIP, expected); } /** @@ -1023,7 +1029,7 @@ public class TextIOTest { new String[]{"dog"}); assertReadingCompressedFileMatchesExpected( - new File(filename), CompressionType.ZIP, new String[] {"cat", "dog"}); + new File(filename), CompressionType.ZIP, Arrays.asList("cat", "dog")); } @Test @@ -1340,5 +1346,21 @@ public class TextIOTest { SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); } -} + @Test + @Category(NeedsRunner.class) + public void testReadAll() throws IOException { + writeToFile(TINY, "readAllTiny1.zip", ZIP); + writeToFile(TINY, "readAllTiny2.zip", ZIP); + writeToFile(LARGE, "readAllLarge1.zip", ZIP); + writeToFile(LARGE, "readAllLarge2.zip", ZIP); + PCollection<String> lines = + p.apply( + Create.of( + tempFolder.resolve("readAllTiny*").toString(), + tempFolder.resolve("readAllLarge*").toString())) + .apply(TextIO.readAll().withCompressionType(AUTO)); + PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 0c2bd1c..cb60f9a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.PAssert; @@ -44,7 +45,6 @@ import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs; import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; -import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/cd216f79/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java index 831894c..8aed6b9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.beam.sdk.io.range.OffsetRange; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException;