This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 16ceca5 [BEAM-7545] Adding RowCount to TextTable. new 3d576f7 Merge pull request #8951 from riazela/TextTableRowCount 16ceca5 is described below commit 16ceca59d8cb823f0f5e42cd7dc6ea717bef3ec8 Author: Alireza Samadian <alireza4...@gmail.com> AuthorDate: Mon Jun 24 16:26:29 2019 -0700 [BEAM-7545] Adding RowCount to TextTable. --- .../main/java/org/apache/beam/sdk/io/FileIO.java | 82 +++++--- .../apache/beam/sdk/io/TextRowCountEstimator.java | 219 +++++++++++++++++++++ .../beam/sdk/io/TextRowCountEstimatorTest.java | 116 +++++++++++ .../sql/meta/provider/text/TextTable.java | 36 ++++ 4 files changed, 424 insertions(+), 29 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 5447e86..57a89ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -729,6 +729,55 @@ public class FileIO { builder.add(DisplayData.item("directoryTreatment", getDirectoryTreatment().toString())); } + /** + * @return True if metadata is a directory and directory Treatment is SKIP. + * @throws java.lang.IllegalArgumentException if metadata is a directory and directoryTreatment + * is Prohibited. + * @throws java.lang.UnsupportedOperationException if metadata is a directory and + * directoryTreatment is not SKIP or PROHIBIT. + */ + static boolean shouldSkipDirectory( + MatchResult.Metadata metadata, DirectoryTreatment directoryTreatment) { + if (metadata.resourceId().isDirectory()) { + switch (directoryTreatment) { + case SKIP: + return true; + case PROHIBIT: + throw new IllegalArgumentException( + "Trying to read " + metadata.resourceId() + " which is a directory"); + + default: + throw new UnsupportedOperationException( + "Unknown DirectoryTreatment: " + directoryTreatment); + } + } + + return false; + } + + /** + * Converts metadata to readableFile. Make sure {@link + * #shouldSkipDirectory(org.apache.beam.sdk.io.fs.MatchResult.Metadata, + * org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment)} returns false before using. + */ + static ReadableFile matchToReadableFile( + MatchResult.Metadata metadata, Compression compression) { + + compression = + (compression == Compression.AUTO) + ? Compression.detect(metadata.resourceId().getFilename()) + : compression; + return new ReadableFile( + MatchResult.Metadata.builder() + .setResourceId(metadata.resourceId()) + .setSizeBytes(metadata.sizeBytes()) + .setLastModifiedMillis(metadata.lastModifiedMillis()) + .setIsReadSeekEfficient( + metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED) + .build(), + compression); + } + private static class ToReadableFileFn extends DoFn<MatchResult.Metadata, ReadableFile> { private final ReadMatches spec; @@ -738,36 +787,11 @@ public class FileIO { @ProcessElement public void process(ProcessContext c) { - MatchResult.Metadata metadata = c.element(); - if (metadata.resourceId().isDirectory()) { - switch (spec.getDirectoryTreatment()) { - case SKIP: - return; - - case PROHIBIT: - throw new IllegalArgumentException( - "Trying to read " + metadata.resourceId() + " which is a directory"); - - default: - throw new UnsupportedOperationException( - "Unknown DirectoryTreatment: " + spec.getDirectoryTreatment()); - } + if (shouldSkipDirectory(c.element(), spec.getDirectoryTreatment())) { + return; } - - Compression compression = - (spec.getCompression() == Compression.AUTO) - ? Compression.detect(metadata.resourceId().getFilename()) - : spec.getCompression(); - c.output( - new ReadableFile( - MatchResult.Metadata.builder() - .setResourceId(metadata.resourceId()) - .setSizeBytes(metadata.sizeBytes()) - .setLastModifiedMillis(metadata.lastModifiedMillis()) - .setIsReadSeekEfficient( - metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED) - .build(), - compression)); + ReadableFile r = matchToReadableFile(c.element(), spec.getCompression()); + c.output(r); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java new file mode 100644 index 0000000..d220505 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; + +/** This returns a row count estimation for files associated with a file pattern. */ +@AutoValue +public abstract class TextRowCountEstimator { + private static final long DEFAULT_NUM_BYTES_PER_FILE = 64 * 1024L; + private static final Compression DEFAULT_COMPRESSION = Compression.AUTO; + private static final FileIO.ReadMatches.DirectoryTreatment DEFAULT_DIRECTORY_TREATMENT = + FileIO.ReadMatches.DirectoryTreatment.SKIP; + private static final EmptyMatchTreatment DEFAULT_EMPTY_MATCH_TREATMENT = + EmptyMatchTreatment.DISALLOW; + private static final SamplingStrategy DEFAULT_SAMPLING_STRATEGY = new SampleAllFiles(); + + public abstract long getNumSampledBytesPerFile(); + + @Nullable + @SuppressWarnings("mutable") + public abstract byte[] getDelimiters(); + + public abstract String getFilePattern(); + + public abstract Compression getCompression(); + + public abstract SamplingStrategy getSamplingStrategy(); + + public abstract EmptyMatchTreatment getEmptyMatchTreatment(); + + public abstract FileIO.ReadMatches.DirectoryTreatment getDirectoryTreatment(); + + public static TextRowCountEstimator.Builder builder() { + return (new AutoValue_TextRowCountEstimator.Builder()) + .setSamplingStrategy(DEFAULT_SAMPLING_STRATEGY) + .setNumSampledBytesPerFile(DEFAULT_NUM_BYTES_PER_FILE) + .setCompression(DEFAULT_COMPRESSION) + .setDirectoryTreatment(DEFAULT_DIRECTORY_TREATMENT) + .setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT); + } + + /** + * Estimates the number of non empty rows. It samples NumSampledBytesPerFile bytes from every file + * until the condition in sampling strategy is met. Then it takes the average line size of the + * rows and divides the total file sizes by that number. If all the sampled rows are empty, and it + * has not sampled all the lines (due to sampling strategy) it throws Exception. + * + * @return Number of estimated rows. + * @throws org.apache.beam.sdk.io.TextRowCountEstimator.NoEstimationException if all the sampled + * lines are empty and we have not read all the lines in the matched files. + */ + public Long estimateRowCount(PipelineOptions pipelineOptions) + throws IOException, NoEstimationException { + long linesSize = 0; + int numberOfReadLines = 0; + long totalFileSizes = 0; + long totalSampledBytes = 0; + int numberOfReadFiles = 0; + boolean sampledEverything = true; + + MatchResult match = FileSystems.match(getFilePattern(), getEmptyMatchTreatment()); + + for (MatchResult.Metadata metadata : match.metadata()) { + + if (getSamplingStrategy().stopSampling(numberOfReadFiles, totalSampledBytes)) { + sampledEverything = false; + break; + } + + if (FileIO.ReadMatches.shouldSkipDirectory(metadata, getDirectoryTreatment())) { + continue; + } + + FileIO.ReadableFile file = FileIO.ReadMatches.matchToReadableFile(metadata, getCompression()); + + // We use this as an estimate of the size of the sampled lines. Since the last sampled line + // may exceed this range, we are over estimating the number of lines in our estimation. (If + // each line is larger than readingWindowSize we will read one line any way and that line is + // the last line) + long readingWindowSize = Math.min(getNumSampledBytesPerFile(), metadata.sizeBytes()); + sampledEverything = metadata.sizeBytes() == readingWindowSize && sampledEverything; + OffsetRange range = new OffsetRange(0, readingWindowSize); + + TextSource textSource = + new TextSource( + ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()), + getEmptyMatchTreatment(), + getDelimiters()); + FileBasedSource<String> source = + CompressedSource.from(textSource).withCompression(file.getCompression()); + try (BoundedSource.BoundedReader<String> reader = + source + .createForSubrangeOfFile(file.getMetadata(), range.getFrom(), range.getTo()) + .createReader(pipelineOptions)) { + + int numberOfNonEmptyLines = 0; + for (boolean more = reader.start(); more; more = reader.advance()) { + numberOfNonEmptyLines += reader.getCurrent().trim().equals("") ? 0 : 1; + } + numberOfReadLines += numberOfNonEmptyLines; + linesSize += (numberOfNonEmptyLines == 0) ? 0 : readingWindowSize; + } + long fileSize = metadata.sizeBytes(); + numberOfReadFiles += fileSize == 0 ? 0 : 1; + totalFileSizes += fileSize; + } + + if (numberOfReadLines == 0 && sampledEverything) { + return 0L; + } + + if (numberOfReadLines == 0) { + throw new NoEstimationException( + "Cannot estimate the row count. All the sampled lines are empty"); + } + + // This is total file sizes divided by average line size. + return totalFileSizes * numberOfReadLines / linesSize; + } + + /** Builder for {@link org.apache.beam.sdk.io.TextRowCountEstimator}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setNumSampledBytesPerFile(long numSampledBytes); + + public abstract Builder setDirectoryTreatment( + FileIO.ReadMatches.DirectoryTreatment directoryTreatment); + + public abstract Builder setCompression(Compression compression); + + public abstract Builder setDelimiters(byte[] delimiters); + + public abstract Builder setFilePattern(String filePattern); + + public abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment); + + public abstract Builder setSamplingStrategy(SamplingStrategy samplingStrategy); + + public abstract TextRowCountEstimator build(); + } + + /** + * An exception that will be thrown if the estimator cannot get an estimation of the number of + * lines. + */ + public static class NoEstimationException extends Exception { + NoEstimationException(String message) { + super(message); + } + } + + /** Sampling Strategy shows us when should we stop reading further files. * */ + public interface SamplingStrategy { + boolean stopSampling(int numberOfFiles, long totalReadBytes); + } + + /** This strategy samples all the files. */ + public static class SampleAllFiles implements SamplingStrategy { + + @Override + public boolean stopSampling(int numberOfSampledFiles, long totalReadBytes) { + return false; + } + } + + /** This strategy stops sampling if we sample enough number of bytes. */ + public static class LimitNumberOfFiles implements SamplingStrategy { + int limit; + + public LimitNumberOfFiles(int limit) { + this.limit = limit; + } + + @Override + public boolean stopSampling(int numberOfFiles, long totalReadBytes) { + return numberOfFiles > limit; + } + } + + /** + * This strategy stops sampling when total number of sampled bytes are more than some threshold. + */ + public static class LimitNumberOfTotalBytes implements SamplingStrategy { + long limit; + + public LimitNumberOfTotalBytes(long limit) { + this.limit = limit; + } + + @Override + public boolean stopSampling(int numberOfFiles, long totalReadBytes) { + return totalReadBytes > limit; + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java new file mode 100644 index 0000000..080c255 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.Writer; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Files; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Tests for {@link org.apache.beam.sdk.io.TextRowCountEstimator}. */ +@RunWith(JUnit4.class) +public class TextRowCountEstimatorTest { + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private static final Logger LOG = LoggerFactory.getLogger(TextRowCountEstimatorTest.class); + + @Test + public void testNonEmptyFiles() throws Exception { + File file1 = temporaryFolder.newFile("file1.txt"); + Writer writer = Files.newWriter(file1, Charsets.UTF_8); + for (int i = 0; i < 100; i++) { + writer.write("123123123\n"); + } + writer.flush(); + writer.close(); + temporaryFolder.newFolder("testfolder"); + temporaryFolder.newFolder("testfolder2"); + file1 = temporaryFolder.newFile("testfolder/test2.txt"); + writer = Files.newWriter(file1, Charsets.UTF_8); + for (int i = 0; i < 50; i++) { + writer.write("123123123\n"); + } + + writer.flush(); + writer.close(); + TextRowCountEstimator textRowCountEstimator = + TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + "/**").build(); + Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); + Assert.assertNotNull(rows); + Assert.assertEquals(150L, rows.longValue()); + } + + @Test(expected = FileNotFoundException.class) + public void testEmptyFolder() throws Exception { + TextRowCountEstimator textRowCountEstimator = + TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + "/**").build(); + Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); + } + + @Test + public void testEmptyFile() throws Exception { + File file1 = temporaryFolder.newFile("file1.txt"); + Writer writer = Files.newWriter(file1, Charsets.UTF_8); + for (int i = 0; i < 100; i++) { + writer.write("\n"); + } + writer.flush(); + writer.close(); + TextRowCountEstimator textRowCountEstimator = + TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + "/**").build(); + Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); + Assert.assertEquals(0L, rows.longValue()); + } + + @Test(expected = TextRowCountEstimator.NoEstimationException.class) + public void lotsOfNewLines() throws Exception { + File file1 = temporaryFolder.newFile("file1.txt"); + Writer writer = Files.newWriter(file1, Charsets.UTF_8); + for (int i = 0; i < 1000; i++) { + writer.write("\n"); + } + writer.write("123123123"); + writer.flush(); + writer.close(); + TextRowCountEstimator textRowCountEstimator = + TextRowCountEstimator.builder() + .setNumSampledBytesPerFile(10L) + .setFilePattern(temporaryFolder.getRoot() + "/**") + .build(); + textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); + } + + @Test(expected = FileNotFoundException.class) + public void testNonExistance() throws Exception { + TextRowCountEstimator textRowCountEstimator = + TextRowCountEstimator.builder() + .setFilePattern(temporaryFolder.getRoot() + "/something/**") + .build(); + Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); + Assert.assertNull(rows); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java index 94674de..60232ab 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java @@ -17,9 +17,14 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.text; +import java.io.IOException; +import java.math.BigInteger; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.TextRowCountEstimator; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; @@ -27,6 +32,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.Row; import org.apache.commons.csv.CSVFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link TextTable} is a {@link org.apache.beam.sdk.extensions.sql.BeamSqlTable} that reads text @@ -41,7 +48,11 @@ public class TextTable extends BaseBeamTable { private final PTransform<PCollection<String>, PCollection<Row>> readConverter; private final PTransform<PCollection<Row>, PCollection<String>> writeConverter; + private static final TextRowCountEstimator.SamplingStrategy DEFAULT_SAMPLING_STRATEGY = + new TextRowCountEstimator.LimitNumberOfTotalBytes(1024 * 1024L); private final String filePattern; + private BeamRowCountStatistics rowCountStatistics = null; + private static final Logger LOGGER = LoggerFactory.getLogger(TextTable.class); /** Text table with the specified read and write transforms. */ public TextTable( @@ -60,6 +71,31 @@ public class TextTable extends BaseBeamTable { } @Override + public BeamRowCountStatistics getRowCount(PipelineOptions options) { + if (rowCountStatistics == null) { + rowCountStatistics = getTextRowEstimate(options, getFilePattern()); + } + + return rowCountStatistics; + } + + private static BeamRowCountStatistics getTextRowEstimate( + PipelineOptions options, String filePattern) { + TextRowCountEstimator textRowCountEstimator = + TextRowCountEstimator.builder() + .setFilePattern(filePattern) + .setSamplingStrategy(DEFAULT_SAMPLING_STRATEGY) + .build(); + try { + Long rows = textRowCountEstimator.estimateRowCount(options); + return BeamRowCountStatistics.createBoundedTableStatistics(BigInteger.valueOf(rows)); + } catch (IOException | TextRowCountEstimator.NoEstimationException e) { + LOGGER.warn("Could not get the row count for the text table " + filePattern, e); + } + return BeamRowCountStatistics.UNKNOWN; + } + + @Override public PCollection.IsBounded isBounded() { return PCollection.IsBounded.BOUNDED; }