This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 16ceca5  [BEAM-7545] Adding RowCount to TextTable.
     new 3d576f7  Merge pull request #8951 from riazela/TextTableRowCount
16ceca5 is described below

commit 16ceca59d8cb823f0f5e42cd7dc6ea717bef3ec8
Author: Alireza Samadian <alireza4...@gmail.com>
AuthorDate: Mon Jun 24 16:26:29 2019 -0700

    [BEAM-7545] Adding RowCount to TextTable.
---
 .../main/java/org/apache/beam/sdk/io/FileIO.java   |  82 +++++---
 .../apache/beam/sdk/io/TextRowCountEstimator.java  | 219 +++++++++++++++++++++
 .../beam/sdk/io/TextRowCountEstimatorTest.java     | 116 +++++++++++
 .../sql/meta/provider/text/TextTable.java          |  36 ++++
 4 files changed, 424 insertions(+), 29 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 5447e86..57a89ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -729,6 +729,55 @@ public class FileIO {
       builder.add(DisplayData.item("directoryTreatment", 
getDirectoryTreatment().toString()));
     }
 
+    /**
+     * @return True if metadata is a directory and directory Treatment is SKIP.
+     * @throws java.lang.IllegalArgumentException if metadata is a directory 
and directoryTreatment
+     *     is Prohibited.
+     * @throws java.lang.UnsupportedOperationException if metadata is a 
directory and
+     *     directoryTreatment is not SKIP or PROHIBIT.
+     */
+    static boolean shouldSkipDirectory(
+        MatchResult.Metadata metadata, DirectoryTreatment directoryTreatment) {
+      if (metadata.resourceId().isDirectory()) {
+        switch (directoryTreatment) {
+          case SKIP:
+            return true;
+          case PROHIBIT:
+            throw new IllegalArgumentException(
+                "Trying to read " + metadata.resourceId() + " which is a 
directory");
+
+          default:
+            throw new UnsupportedOperationException(
+                "Unknown DirectoryTreatment: " + directoryTreatment);
+        }
+      }
+
+      return false;
+    }
+
+    /**
+     * Converts metadata to readableFile. Make sure {@link
+     * #shouldSkipDirectory(org.apache.beam.sdk.io.fs.MatchResult.Metadata,
+     * org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment)} returns 
false before using.
+     */
+    static ReadableFile matchToReadableFile(
+        MatchResult.Metadata metadata, Compression compression) {
+
+      compression =
+          (compression == Compression.AUTO)
+              ? Compression.detect(metadata.resourceId().getFilename())
+              : compression;
+      return new ReadableFile(
+          MatchResult.Metadata.builder()
+              .setResourceId(metadata.resourceId())
+              .setSizeBytes(metadata.sizeBytes())
+              .setLastModifiedMillis(metadata.lastModifiedMillis())
+              .setIsReadSeekEfficient(
+                  metadata.isReadSeekEfficient() && compression == 
Compression.UNCOMPRESSED)
+              .build(),
+          compression);
+    }
+
     private static class ToReadableFileFn extends DoFn<MatchResult.Metadata, 
ReadableFile> {
       private final ReadMatches spec;
 
@@ -738,36 +787,11 @@ public class FileIO {
 
       @ProcessElement
       public void process(ProcessContext c) {
-        MatchResult.Metadata metadata = c.element();
-        if (metadata.resourceId().isDirectory()) {
-          switch (spec.getDirectoryTreatment()) {
-            case SKIP:
-              return;
-
-            case PROHIBIT:
-              throw new IllegalArgumentException(
-                  "Trying to read " + metadata.resourceId() + " which is a 
directory");
-
-            default:
-              throw new UnsupportedOperationException(
-                  "Unknown DirectoryTreatment: " + 
spec.getDirectoryTreatment());
-          }
+        if (shouldSkipDirectory(c.element(), spec.getDirectoryTreatment())) {
+          return;
         }
-
-        Compression compression =
-            (spec.getCompression() == Compression.AUTO)
-                ? Compression.detect(metadata.resourceId().getFilename())
-                : spec.getCompression();
-        c.output(
-            new ReadableFile(
-                MatchResult.Metadata.builder()
-                    .setResourceId(metadata.resourceId())
-                    .setSizeBytes(metadata.sizeBytes())
-                    .setLastModifiedMillis(metadata.lastModifiedMillis())
-                    .setIsReadSeekEfficient(
-                        metadata.isReadSeekEfficient() && compression == 
Compression.UNCOMPRESSED)
-                    .build(),
-                compression));
+        ReadableFile r = matchToReadableFile(c.element(), 
spec.getCompression());
+        c.output(r);
       }
     }
   }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
new file mode 100644
index 0000000..d220505
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+
+/** This returns a row count estimation for files associated with a file 
pattern. */
+@AutoValue
+public abstract class TextRowCountEstimator {
+  private static final long DEFAULT_NUM_BYTES_PER_FILE = 64 * 1024L;
+  private static final Compression DEFAULT_COMPRESSION = Compression.AUTO;
+  private static final FileIO.ReadMatches.DirectoryTreatment 
DEFAULT_DIRECTORY_TREATMENT =
+      FileIO.ReadMatches.DirectoryTreatment.SKIP;
+  private static final EmptyMatchTreatment DEFAULT_EMPTY_MATCH_TREATMENT =
+      EmptyMatchTreatment.DISALLOW;
+  private static final SamplingStrategy DEFAULT_SAMPLING_STRATEGY = new 
SampleAllFiles();
+
+  public abstract long getNumSampledBytesPerFile();
+
+  @Nullable
+  @SuppressWarnings("mutable")
+  public abstract byte[] getDelimiters();
+
+  public abstract String getFilePattern();
+
+  public abstract Compression getCompression();
+
+  public abstract SamplingStrategy getSamplingStrategy();
+
+  public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+  public abstract FileIO.ReadMatches.DirectoryTreatment 
getDirectoryTreatment();
+
+  public static TextRowCountEstimator.Builder builder() {
+    return (new AutoValue_TextRowCountEstimator.Builder())
+        .setSamplingStrategy(DEFAULT_SAMPLING_STRATEGY)
+        .setNumSampledBytesPerFile(DEFAULT_NUM_BYTES_PER_FILE)
+        .setCompression(DEFAULT_COMPRESSION)
+        .setDirectoryTreatment(DEFAULT_DIRECTORY_TREATMENT)
+        .setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT);
+  }
+
+  /**
+   * Estimates the number of non empty rows. It samples NumSampledBytesPerFile 
bytes from every file
+   * until the condition in sampling strategy is met. Then it takes the 
average line size of the
+   * rows and divides the total file sizes by that number. If all the sampled 
rows are empty, and it
+   * has not sampled all the lines (due to sampling strategy) it throws 
Exception.
+   *
+   * @return Number of estimated rows.
+   * @throws 
org.apache.beam.sdk.io.TextRowCountEstimator.NoEstimationException if all the 
sampled
+   *     lines are empty and we have not read all the lines in the matched 
files.
+   */
+  public Long estimateRowCount(PipelineOptions pipelineOptions)
+      throws IOException, NoEstimationException {
+    long linesSize = 0;
+    int numberOfReadLines = 0;
+    long totalFileSizes = 0;
+    long totalSampledBytes = 0;
+    int numberOfReadFiles = 0;
+    boolean sampledEverything = true;
+
+    MatchResult match = FileSystems.match(getFilePattern(), 
getEmptyMatchTreatment());
+
+    for (MatchResult.Metadata metadata : match.metadata()) {
+
+      if (getSamplingStrategy().stopSampling(numberOfReadFiles, 
totalSampledBytes)) {
+        sampledEverything = false;
+        break;
+      }
+
+      if (FileIO.ReadMatches.shouldSkipDirectory(metadata, 
getDirectoryTreatment())) {
+        continue;
+      }
+
+      FileIO.ReadableFile file = 
FileIO.ReadMatches.matchToReadableFile(metadata, getCompression());
+
+      // We use this as an estimate of the size of the sampled lines. Since 
the last sampled line
+      // may exceed this range, we are over estimating the number of lines in 
our estimation. (If
+      // each line is larger than readingWindowSize we will read one line any 
way and that line is
+      // the last line)
+      long readingWindowSize = Math.min(getNumSampledBytesPerFile(), 
metadata.sizeBytes());
+      sampledEverything = metadata.sizeBytes() == readingWindowSize && 
sampledEverything;
+      OffsetRange range = new OffsetRange(0, readingWindowSize);
+
+      TextSource textSource =
+          new TextSource(
+              
ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()),
+              getEmptyMatchTreatment(),
+              getDelimiters());
+      FileBasedSource<String> source =
+          
CompressedSource.from(textSource).withCompression(file.getCompression());
+      try (BoundedSource.BoundedReader<String> reader =
+          source
+              .createForSubrangeOfFile(file.getMetadata(), range.getFrom(), 
range.getTo())
+              .createReader(pipelineOptions)) {
+
+        int numberOfNonEmptyLines = 0;
+        for (boolean more = reader.start(); more; more = reader.advance()) {
+          numberOfNonEmptyLines += reader.getCurrent().trim().equals("") ? 0 : 
1;
+        }
+        numberOfReadLines += numberOfNonEmptyLines;
+        linesSize += (numberOfNonEmptyLines == 0) ? 0 : readingWindowSize;
+      }
+      long fileSize = metadata.sizeBytes();
+      numberOfReadFiles += fileSize == 0 ? 0 : 1;
+      totalFileSizes += fileSize;
+    }
+
+    if (numberOfReadLines == 0 && sampledEverything) {
+      return 0L;
+    }
+
+    if (numberOfReadLines == 0) {
+      throw new NoEstimationException(
+          "Cannot estimate the row count. All the sampled lines are empty");
+    }
+
+    // This is total file sizes divided by average line size.
+    return totalFileSizes * numberOfReadLines / linesSize;
+  }
+
+  /** Builder for {@link org.apache.beam.sdk.io.TextRowCountEstimator}. */
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setNumSampledBytesPerFile(long numSampledBytes);
+
+    public abstract Builder setDirectoryTreatment(
+        FileIO.ReadMatches.DirectoryTreatment directoryTreatment);
+
+    public abstract Builder setCompression(Compression compression);
+
+    public abstract Builder setDelimiters(byte[] delimiters);
+
+    public abstract Builder setFilePattern(String filePattern);
+
+    public abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment 
emptyMatchTreatment);
+
+    public abstract Builder setSamplingStrategy(SamplingStrategy 
samplingStrategy);
+
+    public abstract TextRowCountEstimator build();
+  }
+
+  /**
+   * An exception that will be thrown if the estimator cannot get an 
estimation of the number of
+   * lines.
+   */
+  public static class NoEstimationException extends Exception {
+    NoEstimationException(String message) {
+      super(message);
+    }
+  }
+
+  /** Sampling Strategy shows us when should we stop reading further files. * 
*/
+  public interface SamplingStrategy {
+    boolean stopSampling(int numberOfFiles, long totalReadBytes);
+  }
+
+  /** This strategy samples all the files. */
+  public static class SampleAllFiles implements SamplingStrategy {
+
+    @Override
+    public boolean stopSampling(int numberOfSampledFiles, long totalReadBytes) 
{
+      return false;
+    }
+  }
+
+  /** This strategy stops sampling if we sample enough number of bytes. */
+  public static class LimitNumberOfFiles implements SamplingStrategy {
+    int limit;
+
+    public LimitNumberOfFiles(int limit) {
+      this.limit = limit;
+    }
+
+    @Override
+    public boolean stopSampling(int numberOfFiles, long totalReadBytes) {
+      return numberOfFiles > limit;
+    }
+  }
+
+  /**
+   * This strategy stops sampling when total number of sampled bytes are more 
than some threshold.
+   */
+  public static class LimitNumberOfTotalBytes implements SamplingStrategy {
+    long limit;
+
+    public LimitNumberOfTotalBytes(long limit) {
+      this.limit = limit;
+    }
+
+    @Override
+    public boolean stopSampling(int numberOfFiles, long totalReadBytes) {
+      return totalReadBytes > limit;
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
new file mode 100644
index 0000000..080c255
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.Writer;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link org.apache.beam.sdk.io.TextRowCountEstimator}. */
+@RunWith(JUnit4.class)
+public class TextRowCountEstimatorTest {
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private static final Logger LOG = 
LoggerFactory.getLogger(TextRowCountEstimatorTest.class);
+
+  @Test
+  public void testNonEmptyFiles() throws Exception {
+    File file1 = temporaryFolder.newFile("file1.txt");
+    Writer writer = Files.newWriter(file1, Charsets.UTF_8);
+    for (int i = 0; i < 100; i++) {
+      writer.write("123123123\n");
+    }
+    writer.flush();
+    writer.close();
+    temporaryFolder.newFolder("testfolder");
+    temporaryFolder.newFolder("testfolder2");
+    file1 = temporaryFolder.newFile("testfolder/test2.txt");
+    writer = Files.newWriter(file1, Charsets.UTF_8);
+    for (int i = 0; i < 50; i++) {
+      writer.write("123123123\n");
+    }
+
+    writer.flush();
+    writer.close();
+    TextRowCountEstimator textRowCountEstimator =
+        
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + 
"/**").build();
+    Long rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+    Assert.assertNotNull(rows);
+    Assert.assertEquals(150L, rows.longValue());
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testEmptyFolder() throws Exception {
+    TextRowCountEstimator textRowCountEstimator =
+        
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + 
"/**").build();
+    Long rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+  }
+
+  @Test
+  public void testEmptyFile() throws Exception {
+    File file1 = temporaryFolder.newFile("file1.txt");
+    Writer writer = Files.newWriter(file1, Charsets.UTF_8);
+    for (int i = 0; i < 100; i++) {
+      writer.write("\n");
+    }
+    writer.flush();
+    writer.close();
+    TextRowCountEstimator textRowCountEstimator =
+        
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + 
"/**").build();
+    Long rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+    Assert.assertEquals(0L, rows.longValue());
+  }
+
+  @Test(expected = TextRowCountEstimator.NoEstimationException.class)
+  public void lotsOfNewLines() throws Exception {
+    File file1 = temporaryFolder.newFile("file1.txt");
+    Writer writer = Files.newWriter(file1, Charsets.UTF_8);
+    for (int i = 0; i < 1000; i++) {
+      writer.write("\n");
+    }
+    writer.write("123123123");
+    writer.flush();
+    writer.close();
+    TextRowCountEstimator textRowCountEstimator =
+        TextRowCountEstimator.builder()
+            .setNumSampledBytesPerFile(10L)
+            .setFilePattern(temporaryFolder.getRoot() + "/**")
+            .build();
+    textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testNonExistance() throws Exception {
+    TextRowCountEstimator textRowCountEstimator =
+        TextRowCountEstimator.builder()
+            .setFilePattern(temporaryFolder.getRoot() + "/something/**")
+            .build();
+    Long rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+    Assert.assertNull(rows);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
index 94674de..60232ab 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
@@ -17,9 +17,14 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.text;
 
+import java.io.IOException;
+import java.math.BigInteger;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.TextRowCountEstimator;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
@@ -27,6 +32,8 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.Row;
 import org.apache.commons.csv.CSVFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link TextTable} is a {@link 
org.apache.beam.sdk.extensions.sql.BeamSqlTable} that reads text
@@ -41,7 +48,11 @@ public class TextTable extends BaseBeamTable {
 
   private final PTransform<PCollection<String>, PCollection<Row>> 
readConverter;
   private final PTransform<PCollection<Row>, PCollection<String>> 
writeConverter;
+  private static final TextRowCountEstimator.SamplingStrategy 
DEFAULT_SAMPLING_STRATEGY =
+      new TextRowCountEstimator.LimitNumberOfTotalBytes(1024 * 1024L);
   private final String filePattern;
+  private BeamRowCountStatistics rowCountStatistics = null;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TextTable.class);
 
   /** Text table with the specified read and write transforms. */
   public TextTable(
@@ -60,6 +71,31 @@ public class TextTable extends BaseBeamTable {
   }
 
   @Override
+  public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+    if (rowCountStatistics == null) {
+      rowCountStatistics = getTextRowEstimate(options, getFilePattern());
+    }
+
+    return rowCountStatistics;
+  }
+
+  private static BeamRowCountStatistics getTextRowEstimate(
+      PipelineOptions options, String filePattern) {
+    TextRowCountEstimator textRowCountEstimator =
+        TextRowCountEstimator.builder()
+            .setFilePattern(filePattern)
+            .setSamplingStrategy(DEFAULT_SAMPLING_STRATEGY)
+            .build();
+    try {
+      Long rows = textRowCountEstimator.estimateRowCount(options);
+      return 
BeamRowCountStatistics.createBoundedTableStatistics(BigInteger.valueOf(rows));
+    } catch (IOException | TextRowCountEstimator.NoEstimationException e) {
+      LOGGER.warn("Could not get the row count for the text table " + 
filePattern, e);
+    }
+    return BeamRowCountStatistics.UNKNOWN;
+  }
+
+  @Override
   public PCollection.IsBounded isBounded() {
     return PCollection.IsBounded.BOUNDED;
   }

Reply via email to