This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0025a63  Add initial support for the BigQuery read API.
     new f6fdeaa  Merge pull request #7441: [BEAM-6392] Add support for the 
BigQuery read API to BigQueryIO.
0025a63 is described below

commit 0025a63cc2a233e80999138bcf14573fc6a610a7
Author: Kenneth Jung <k...@google.com>
AuthorDate: Tue Feb 5 14:37:14 2019 -0800

    Add initial support for the BigQuery read API.
    
    This change adds new Source objects which support reading tables from
    BigQuery using the new high-throughput read API.
    
    It also modifies the Avro-to-JSON conversion code in the BigQuery
    connector to support the Avro records generated by both the existing
    export process and the new read API, and adds an integration test to
    verify that the TableRows which are constructed using each method are
    equivalent.
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +
 runners/google-cloud-dataflow-java/build.gradle    |   4 +
 sdks/java/io/google-cloud-platform/build.gradle    |   4 +
 .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java     | 133 +++-
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  |  65 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  87 ++-
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |  32 +
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  62 +-
 .../gcp/bigquery/BigQueryStorageStreamSource.java  | 237 ++++++
 .../gcp/bigquery/BigQueryStorageTableSource.java   | 214 ++++++
 .../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java  |   1 +
 .../io/gcp/bigquery/BigQueryIOStorageReadIT.java   | 119 +++
 .../bigquery/BigQueryIOStorageReadTableRowIT.java  | 166 +++++
 .../io/gcp/bigquery/BigQueryIOStorageReadTest.java | 825 +++++++++++++++++++++
 .../io/gcp/bigquery/BigQueryServicesImplTest.java  |   6 +-
 .../sdk/io/gcp/bigquery/FakeBigQueryServices.java  |  13 +-
 .../sdk/io/gcp/bigquery/FakeDatasetService.java    |   6 +
 17 files changed, 1932 insertions(+), 44 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 4c58acf..34b710a 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -418,6 +418,8 @@ class BeamModulePlugin implements Plugin<Project> {
         google_auth_library_credentials             : 
"com.google.auth:google-auth-library-credentials:$google_auth_version",
         google_auth_library_oauth2_http             : 
"com.google.auth:google-auth-library-oauth2-http:$google_auth_version",
         google_cloud_bigquery                       : 
"com.google.cloud:google-cloud-bigquery:$google_clients_version",
+        google_cloud_bigquery_storage               : 
"com.google.cloud:google-cloud-bigquerystorage:0.79.0-alpha",
+        google_cloud_bigquery_storage_proto         : 
"com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1:$generated_grpc_beta_version",
         google_cloud_core                           : 
"com.google.cloud:google-cloud-core:$google_cloud_core_version",
         google_cloud_core_grpc                      : 
"com.google.cloud:google-cloud-core-grpc:$google_cloud_core_version",
         google_cloud_dataflow_java_proto_library_all: 
"com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304",
diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index c7e126b..0c5d64a 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -300,6 +300,8 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: 
Test) {
 
   include '**/*IT.class'
   exclude '**/BigQueryIOReadIT.class'
+  exclude '**/BigQueryIOStorageReadIT.class'
+  exclude '**/BigQueryIOStorageReadTableRowIT.class'
   exclude '**/PubsubReadIT.class'
   exclude '**/*KmsKeyIT.class'
   maxParallelForks 4
@@ -341,6 +343,8 @@ task googleCloudPlatformFnApiWorkerIntegrationTest(type: 
Test) {
 
     include '**/*IT.class'
     exclude '**/BigQueryIOReadIT.class'
+    exclude '**/BigQueryIOStorageReadIT.class'
+    exclude '**/BigQueryIOStorageReadTableRowIT.class'
     exclude '**/PubsubReadIT.class'
     exclude '**/SpannerReadIT.class'
     exclude '**/BigtableReadIT.class'
diff --git a/sdks/java/io/google-cloud-platform/build.gradle 
b/sdks/java/io/google-cloud-platform/build.gradle
index 6b8c7d9..f403f66 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -53,6 +53,8 @@ dependencies {
   shadow library.java.netty_handler
   shadow library.java.grpc_stub
   shadow library.java.joda_time
+  shadow library.java.google_cloud_bigquery_storage
+  shadow library.java.google_cloud_bigquery_storage_proto
   shadow library.java.google_cloud_core
   shadow library.java.google_cloud_spanner
   shadow library.java.bigtable_protos
@@ -98,6 +100,8 @@ task integrationTest(type: Test) {
 
   include '**/*IT.class'
   exclude '**/BigQueryIOReadIT.class'
+  exclude '**/BigQueryIOStorageReadIT.class'
+  exclude '**/BigQueryIOStorageReadTableRowIT.class'
   exclude '**/BigQueryToTableIT.class'
   exclude '**/*KmsKeyIT.class'
   maxParallelForks 4
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index 504b9db..9238e77 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static java.time.temporal.ChronoField.HOUR_OF_DAY;
+import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
+import static java.time.temporal.ChronoField.NANO_OF_SECOND;
+import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Verify.verify;
@@ -27,6 +31,9 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatterBuilder;
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
@@ -37,7 +44,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericRecord;
-import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableCollection;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMultimap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.io.BaseEncoding;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -50,8 +58,14 @@ import org.joda.time.format.DateTimeFormatter;
  */
 class BigQueryAvroUtils {
 
-  public static final ImmutableMap<String, Type> BIG_QUERY_TO_AVRO_TYPES =
-      ImmutableMap.<String, Type>builder()
+  /**
+   * Defines the valid mapping between BigQuery types and native Avro types.
+   *
+   * <p>Some BigQuery types are duplicated here since slightly different Avro 
records are produced
+   * when exporting data in Avro format and when reading data directly using 
the read API.
+   */
+  public static final ImmutableMultimap<String, Type> BIG_QUERY_TO_AVRO_TYPES =
+      ImmutableMultimap.<String, Type>builder()
           .put("STRING", Type.STRING)
           .put("GEOGRAPHY", Type.STRING)
           .put("BYTES", Type.BYTES)
@@ -62,8 +76,10 @@ class BigQueryAvroUtils {
           .put("TIMESTAMP", Type.LONG)
           .put("RECORD", Type.RECORD)
           .put("DATE", Type.STRING)
+          .put("DATE", Type.INT)
           .put("DATETIME", Type.STRING)
           .put("TIME", Type.STRING)
+          .put("TIME", Type.LONG)
           .build();
 
   /**
@@ -72,8 +88,8 @@ class BigQueryAvroUtils {
    */
   private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
       DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
-  // Package private for BigQueryTableRowIterator to use.
-  static String formatTimestamp(String timestamp) {
+
+  private static String formatTimestamp(String timestamp) {
     // timestamp is in "seconds since epoch" format, with scientific notation.
     // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
     // Separate into seconds and microseconds.
@@ -101,6 +117,62 @@ class BigQueryAvroUtils {
   }
 
   /**
+   * This method formats a BigQuery DATE value into a String matching the 
format used by JSON
+   * export. Date records are stored in "days since epoch" format, and 
BigQuery uses the proleptic
+   * Gregorian calendar.
+   */
+  private static String formatDate(int date) {
+    return 
LocalDate.ofEpochDay(date).format(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE);
+  }
+
+  private static final java.time.format.DateTimeFormatter 
ISO_LOCAL_TIME_FORMATTER_MICROS =
+      new DateTimeFormatterBuilder()
+          .appendValue(HOUR_OF_DAY, 2)
+          .appendLiteral(':')
+          .appendValue(MINUTE_OF_HOUR, 2)
+          .appendLiteral(':')
+          .appendValue(SECOND_OF_MINUTE, 2)
+          .appendLiteral('.')
+          .appendFraction(NANO_OF_SECOND, 6, 6, false)
+          .toFormatter();
+
+  private static final java.time.format.DateTimeFormatter 
ISO_LOCAL_TIME_FORMATTER_MILLIS =
+      new DateTimeFormatterBuilder()
+          .appendValue(HOUR_OF_DAY, 2)
+          .appendLiteral(':')
+          .appendValue(MINUTE_OF_HOUR, 2)
+          .appendLiteral(':')
+          .appendValue(SECOND_OF_MINUTE, 2)
+          .appendLiteral('.')
+          .appendFraction(NANO_OF_SECOND, 3, 3, false)
+          .toFormatter();
+
+  private static final java.time.format.DateTimeFormatter 
ISO_LOCAL_TIME_FORMATTER_SECONDS =
+      new DateTimeFormatterBuilder()
+          .appendValue(HOUR_OF_DAY, 2)
+          .appendLiteral(':')
+          .appendValue(MINUTE_OF_HOUR, 2)
+          .appendLiteral(':')
+          .appendValue(SECOND_OF_MINUTE, 2)
+          .toFormatter();
+
+  /**
+   * This method formats a BigQuery TIME value into a String matching the 
format used by JSON
+   * export. Time records are stored in "microseconds since midnight" format.
+   */
+  private static String formatTime(long timeMicros) {
+    java.time.format.DateTimeFormatter formatter;
+    if (timeMicros % 1000000 == 0) {
+      formatter = ISO_LOCAL_TIME_FORMATTER_SECONDS;
+    } else if (timeMicros % 1000 == 0) {
+      formatter = ISO_LOCAL_TIME_FORMATTER_MILLIS;
+    } else {
+      formatter = ISO_LOCAL_TIME_FORMATTER_MICROS;
+    }
+    return LocalTime.ofNanoOfDay(timeMicros * 1000).format(formatter);
+  }
+
+  /**
    * Utility function to convert from an Avro {@link GenericRecord} to a 
BigQuery {@link TableRow}.
    *
    * <p>See <a 
href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config";>"Avro
@@ -124,6 +196,7 @@ class BigQueryAvroUtils {
         row.set(field.name(), convertedValue);
       }
     }
+
     return row;
   }
 
@@ -177,27 +250,47 @@ class BigQueryAvroUtils {
     // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the type field
     // is required, so it may not be null.
     String bqType = fieldSchema.getType();
-    Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType);
-    verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType);
+    ImmutableCollection<Type> expectedAvroTypes = 
BIG_QUERY_TO_AVRO_TYPES.get(bqType);
+    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
     verify(
-        avroType == expectedAvroType,
-        "Expected Avro schema type %s, not %s, for BigQuery %s field %s",
-        expectedAvroType,
-        avroType,
+        expectedAvroTypes.contains(avroType),
+        "Expected Avro schema types %s for BigQuery %s field %s, but received 
%s",
+        expectedAvroTypes,
         bqType,
-        fieldSchema.getName());
+        fieldSchema.getName(),
+        avroType);
     // For historical reasons, don't validate avroLogicalType except for with 
NUMERIC.
     // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL 
logical type.
-    switch (fieldSchema.getType()) {
+    switch (bqType) {
       case "STRING":
-      case "DATE":
       case "DATETIME":
-      case "TIME":
       case "GEOGRAPHY":
         // Avro will use a CharSequence to represent String objects, but it 
may not always use
         // java.lang.String; for example, it may prefer 
org.apache.avro.util.Utf8.
         verify(v instanceof CharSequence, "Expected CharSequence (String), got 
%s", v.getClass());
         return v.toString();
+      case "DATE":
+        if (avroType == Type.INT) {
+          verify(v instanceof Integer, "Expected Integer, got %s", 
v.getClass());
+          verifyNotNull(avroLogicalType, "Expected Date logical type");
+          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date 
logical type");
+          return formatDate((Integer) v);
+        } else {
+          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
+          return v.toString();
+        }
+      case "TIME":
+        if (avroType == Type.LONG) {
+          verify(v instanceof Long, "Expected Long, got %s", v.getClass());
+          verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
+          verify(
+              avroLogicalType instanceof LogicalTypes.TimeMicros,
+              "Expected TimeMicros logical type");
+          return formatTime((Long) v);
+        } else {
+          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
+          return v.toString();
+        }
       case "INTEGER":
         verify(v instanceof Long, "Expected Long, got %s", v.getClass());
         return ((Long) v).toString();
@@ -219,11 +312,11 @@ class BigQueryAvroUtils {
         return v;
       case "TIMESTAMP":
         // TIMESTAMP data types are represented as Avro LONG types. They are 
converted back to
-        // Strings with variable-precision (up to six digits) to match the 
JSON files export
-        // by BigQuery.
+        // Strings with variable precision (up to six digits) to match the 
JSON files exported by
+        // BigQuery.
         verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-        Double doubleValue = ((Long) v) / 1000000.0;
-        return formatTimestamp(doubleValue.toString());
+        double doubleValue = ((Long) v) / 1_000_000.0;
+        return formatTimestamp(Double.toString(doubleValue));
       case "RECORD":
         verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", 
v.getClass());
         return convertGenericRecordToTableRow((GenericRecord) v, 
fieldSchema.getFields());
@@ -283,7 +376,7 @@ class BigQueryAvroUtils {
   }
 
   private static Field convertField(TableFieldSchema bigQueryField) {
-    Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
+    Type avroType = 
BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()).iterator().next();
     Schema elementSchema;
     if (avroType == Type.RECORD) {
       elementSchema = toGenericAvroSchema(bigQueryField.getName(), 
bigQueryField.getFields());
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 059ff22..8e90467 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -28,6 +28,7 @@ import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
+import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -346,8 +347,26 @@ public class BigQueryHelpers {
     UNKNOWN,
   }
 
-  @Nullable
+  @VisibleForTesting
+  static TableReferenceProto.TableReference toTableRefProto(TableReference 
ref) {
+    TableReferenceProto.TableReference.Builder builder =
+        TableReferenceProto.TableReference.newBuilder();
+    if (ref.getProjectId() != null) {
+      builder.setProjectId(ref.getProjectId());
+    }
+    return 
builder.setDatasetId(ref.getDatasetId()).setTableId(ref.getTableId()).build();
+  }
+
+  @VisibleForTesting
+  static TableReference toTableRef(TableReferenceProto.TableReference ref) {
+    return new TableReference()
+        .setProjectId(ref.getProjectId())
+        .setDatasetId(ref.getDatasetId())
+        .setTableId(ref.getTableId());
+  }
+
   /** Return a displayable string representation for a {@link TableReference}. 
*/
+  @Nullable
   static ValueProvider<String> displayTable(@Nullable 
ValueProvider<TableReference> table) {
     if (table == null) {
       return null;
@@ -367,6 +386,28 @@ public class BigQueryHelpers {
     return sb.toString();
   }
 
+  @Nullable
+  static ValueProvider<String> displayTableRefProto(
+      @Nullable ValueProvider<TableReferenceProto.TableReference> table) {
+    if (table == null) {
+      return null;
+    }
+
+    return NestedValueProvider.of(table, new TableRefProtoToTableSpec());
+  }
+
+  /** Returns a canonical string representation of a {@link 
TableReferenceProto.TableReference}. */
+  public static String toTableSpec(TableReferenceProto.TableReference ref) {
+    StringBuilder sb = new StringBuilder();
+    if (ref.getProjectId() != null) {
+      sb.append(ref.getProjectId());
+      sb.append(":");
+    }
+
+    sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
+    return sb.toString();
+  }
+
   static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
     return map.computeIfAbsent(key, k -> new ArrayList<>());
   }
@@ -581,6 +622,21 @@ public class BigQueryHelpers {
     }
   }
 
+  static class TableRefToJson implements SerializableFunction<TableReference, 
String> {
+    @Override
+    public String apply(TableReference from) {
+      return toJsonString(from);
+    }
+  }
+
+  static class TableRefToTableRefProto
+      implements SerializableFunction<TableReference, 
TableReferenceProto.TableReference> {
+    @Override
+    public TableReferenceProto.TableReference apply(TableReference from) {
+      return toTableRefProto(from);
+    }
+  }
+
   static class TableRefToTableSpec implements 
SerializableFunction<TableReference, String> {
     @Override
     public String apply(TableReference from) {
@@ -588,10 +644,11 @@ public class BigQueryHelpers {
     }
   }
 
-  static class TableRefToJson implements SerializableFunction<TableReference, 
String> {
+  static class TableRefProtoToTableSpec
+      implements SerializableFunction<TableReferenceProto.TableReference, 
String> {
     @Override
-    public String apply(TableReference from) {
-      return toJsonString(from);
+    public String apply(TableReferenceProto.TableReference from) {
+      return toTableSpec(from);
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 00012d5..a83a4a1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -33,6 +33,7 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.auto.value.AutoValue;
+import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +60,7 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TimePartitioningToJson;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.ExtractResult;
@@ -382,6 +384,7 @@ public class BigQueryIO {
         .setWithTemplateCompatibility(false)
         .setBigQueryServices(new BigQueryServicesImpl())
         .setParseFn(parseFn)
+        .setMethod(Method.DEFAULT)
         .build();
   }
 
@@ -528,6 +531,21 @@ public class BigQueryIO {
   /** Implementation of {@link BigQueryIO#read(SerializableFunction)}. */
   @AutoValue
   public abstract static class TypedRead<T> extends PTransform<PBegin, 
PCollection<T>> {
+    /** Determines the method used to read data from BigQuery. */
+    @Experimental(Experimental.Kind.SOURCE_SINK)
+    public enum Method {
+      /** The default behavior if no method is explicitly set. Currently 
{@link #EXPORT}. */
+      DEFAULT,
+
+      /**
+       * Export data to Google Cloud Storage in Avro format and read data 
files from that location.
+       */
+      EXPORT,
+
+      /** Read the contents of a table directly using the BigQuery storage 
API. */
+      DIRECT_READ,
+    }
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -550,6 +568,12 @@ public class BigQueryIO {
 
       abstract Builder<T> setQueryLocation(String location);
 
+      @Experimental(Experimental.Kind.SOURCE_SINK)
+      abstract Builder<T> setMethod(Method method);
+
+      @Experimental(Experimental.Kind.SOURCE_SINK)
+      abstract Builder<T> setReadOptions(TableReadOptions readOptions);
+
       abstract TypedRead<T> build();
 
       abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> 
parseFn);
@@ -585,6 +609,13 @@ public class BigQueryIO {
     @Nullable
     abstract String getQueryLocation();
 
+    @Experimental(Experimental.Kind.SOURCE_SINK)
+    abstract Method getMethod();
+
+    @Experimental(Experimental.Kind.SOURCE_SINK)
+    @Nullable
+    abstract TableReadOptions getReadOptions();
+
     @Nullable
     abstract Coder<T> getCoder();
 
@@ -664,19 +695,21 @@ public class BigQueryIO {
       // read is properly specified.
       BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
 
-      String tempLocation = bqOptions.getTempLocation();
-      checkArgument(
-          !Strings.isNullOrEmpty(tempLocation),
-          "BigQueryIO.Read needs a GCS temp location to store temp files.");
-      if (getBigQueryServices() == null) {
-        try {
-          GcsPath.fromUri(tempLocation);
-        } catch (IllegalArgumentException e) {
-          throw new IllegalArgumentException(
-              String.format(
-                  "BigQuery temp location expected a valid 'gs://' path, but 
was given '%s'",
-                  tempLocation),
-              e);
+      if (getMethod() != Method.DIRECT_READ) {
+        String tempLocation = bqOptions.getTempLocation();
+        checkArgument(
+            !Strings.isNullOrEmpty(tempLocation),
+            "BigQueryIO.Read needs a GCS temp location to store temp files.");
+        if (getBigQueryServices() == null) {
+          try {
+            GcsPath.fromUri(tempLocation);
+          } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "BigQuery temp location expected a valid 'gs://' path, but 
was given '%s'",
+                    tempLocation),
+                e);
+          }
         }
       }
 
@@ -696,6 +729,8 @@ public class BigQueryIO {
           BigQueryHelpers.verifyTablePresence(datasetService, table.get());
         } else if (getQuery() != null) {
           checkArgument(
+              getMethod() != Method.DIRECT_READ, "Cannot read query results 
with DIRECT_READ");
+          checkArgument(
               getQuery().isAccessible(), "Cannot call validate if query is 
dynamically set.");
           JobService jobService = 
getBigQueryServices().getJobService(bqOptions);
           try {
@@ -738,6 +773,8 @@ public class BigQueryIO {
               BigQueryOptions.class.getSimpleName());
         }
       } else {
+        checkArgument(
+            getMethod() != Method.DIRECT_READ, "Method must not be DIRECT_READ 
if query is set");
         checkArgument(getQuery() != null, "Either from() or fromQuery() is 
required");
         checkArgument(
             getFlattenResults() != null, "flattenResults should not be null if 
query is set");
@@ -747,6 +784,18 @@ public class BigQueryIO {
 
       Pipeline p = input.getPipeline();
       final Coder<T> coder = inferCoder(p.getCoderRegistry());
+
+      if (getMethod() == Method.DIRECT_READ) {
+        return p.apply(
+            org.apache.beam.sdk.io.Read.from(
+                BigQueryStorageTableSource.create(
+                    getTableProvider(),
+                    getReadOptions(),
+                    getParseFn(),
+                    coder,
+                    getBigQueryServices())));
+      }
+
       final PCollectionView<String> jobIdTokenView;
       PCollection<String> jobIdTokenCollection;
       PCollection<T> rows;
@@ -979,6 +1028,18 @@ public class BigQueryIO {
       return toBuilder().setQueryLocation(location).build();
     }
 
+    /** See {@link Method}. */
+    @Experimental(Experimental.Kind.SOURCE_SINK)
+    public TypedRead<T> withMethod(Method method) {
+      return toBuilder().setMethod(method).build();
+    }
+
+    /** Read options, including a list of selected columns and push-down SQL 
filter text. */
+    @Experimental(Experimental.Kind.SOURCE_SINK)
+    public TypedRead<T> withReadOptions(TableReadOptions readOptions) {
+      return toBuilder().setReadOptions(readOptions).build();
+    }
+
     @Experimental(Experimental.Kind.SOURCE_SINK)
     public TypedRead<T> withTemplateCompatibility() {
       return toBuilder().setWithTemplateCompatibility(true).build();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 85df41a..e883137 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -28,10 +28,15 @@ import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
+import 
com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /** An interface for real, mock, or fake implementations of Cloud BigQuery 
services. */
@@ -43,6 +48,10 @@ public interface BigQueryServices extends Serializable {
   /** Returns a real, mock, or fake {@link DatasetService}. */
   DatasetService getDatasetService(BigQueryOptions bqOptions);
 
+  /** Returns a real, mock, or fake {@link StorageClient}. */
+  @Experimental(Experimental.Kind.SOURCE_SINK)
+  StorageClient getStorageClient(BigQueryOptions bqOptions) throws IOException;
+
   /** An interface for the Cloud BigQuery load service. */
   interface JobService {
     /** Start a BigQuery load job. */
@@ -89,6 +98,10 @@ public interface BigQueryServices extends Serializable {
     @Nullable
     Table getTable(TableReference tableRef) throws InterruptedException, 
IOException;
 
+    @Nullable
+    Table getTable(TableReference tableRef, List<String> selectedFields)
+        throws InterruptedException, IOException;
+
     /** Creates the specified table if it does not exist. */
     void createTable(Table table) throws InterruptedException, IOException;
 
@@ -150,4 +163,23 @@ public interface BigQueryServices extends Serializable {
     Table patchTableDescription(TableReference tableReference, @Nullable 
String tableDescription)
         throws IOException, InterruptedException;
   }
+
+  /** An interface representing a client object for making calls to the 
BigQuery Storage API. */
+  @Experimental(Experimental.Kind.SOURCE_SINK)
+  interface StorageClient extends AutoCloseable {
+    /** Create a new read session against an existing table. */
+    ReadSession createReadSession(CreateReadSessionRequest request);
+
+    /** Read rows in the context of a specific read stream. */
+    Iterable<ReadRowsResponse> readRows(ReadRowsRequest request);
+
+    /**
+     * Close the client object.
+     *
+     * <p>The override is required since {@link AutoCloseable} allows the 
close method to raise an
+     * exception.
+     */
+    @Override
+    void close();
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index a30465e..e1b1a7f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -26,7 +26,9 @@ import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.ExponentialBackOff;
 import com.google.api.client.util.Sleeper;
+import com.google.api.gax.core.FixedCredentialsProvider;
 import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Tables;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.DatasetReference;
 import com.google.api.services.bigquery.model.Job;
@@ -46,6 +48,12 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.auth.Credentials;
 import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
+import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
+import 
com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
 import java.io.IOException;
@@ -56,6 +64,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -105,6 +114,11 @@ class BigQueryServicesImpl implements BigQueryServices {
     return new DatasetServiceImpl(options);
   }
 
+  @Override
+  public StorageClient getStorageClient(BigQueryOptions options) throws 
IOException {
+    return new StorageClientImpl(options);
+  }
+
   private static BackOff createDefaultBackoff() {
     return BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff());
   }
@@ -412,16 +426,29 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     @Nullable
     public Table getTable(TableReference tableRef) throws IOException, 
InterruptedException {
-      return getTable(tableRef, createDefaultBackoff(), Sleeper.DEFAULT);
+      return getTable(tableRef, null);
+    }
+
+    @Override
+    @Nullable
+    public Table getTable(TableReference tableRef, List<String> selectedFields)
+        throws IOException, InterruptedException {
+      return getTable(tableRef, selectedFields, createDefaultBackoff(), 
Sleeper.DEFAULT);
     }
 
     @VisibleForTesting
     @Nullable
-    Table getTable(TableReference ref, BackOff backoff, Sleeper sleeper)
+    Table getTable(
+        TableReference ref, @Nullable List<String> selectedFields, BackOff 
backoff, Sleeper sleeper)
         throws IOException, InterruptedException {
+      Tables.Get get =
+          client.tables().get(ref.getProjectId(), ref.getDatasetId(), 
ref.getTableId());
+      if (selectedFields != null && !selectedFields.isEmpty()) {
+        get.setSelectedFields(String.join(",", selectedFields));
+      }
       try {
         return executeWithRetries(
-            client.tables().get(ref.getProjectId(), ref.getDatasetId(), 
ref.getTableId()),
+            get,
             String.format(
                 "Unable to get table: %s, aborting after %d retries.",
                 ref.getTableId(), MAX_RPC_RETRIES),
@@ -945,4 +972,33 @@ class BigQueryServicesImpl implements BigQueryServices {
             + 
"https://cloud.google.com/service-infrastructure/docs/rate-limiting#configure";);
     return builder.build();
   }
+
+  @Experimental(Experimental.Kind.SOURCE_SINK)
+  static class StorageClientImpl implements StorageClient {
+
+    private final BigQueryStorageClient client;
+
+    private StorageClientImpl(BigQueryOptions options) throws IOException {
+      BigQueryStorageSettings settings =
+          BigQueryStorageSettings.newBuilder()
+              
.setCredentialsProvider(FixedCredentialsProvider.create(options.getGcpCredential()))
+              .build();
+      this.client = BigQueryStorageClient.create(settings);
+    }
+
+    @Override
+    public ReadSession createReadSession(CreateReadSessionRequest request) {
+      return client.createReadSession(request);
+    }
+
+    @Override
+    public Iterable<ReadRowsResponse> readRows(ReadRowsRequest request) {
+      return client.readRowsCallable().call(request);
+    }
+
+    @Override
+    public void close() {
+      client.close();
+    }
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
new file mode 100644
index 0000000..ed3995e
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+
+/** A {@link org.apache.beam.sdk.io.Source} representing a single stream in a 
read session. */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class BigQueryStorageStreamSource<T> extends OffsetBasedSource<T> {
+
+  public static <T> BigQueryStorageStreamSource<T> create(
+      ReadSession readSession,
+      Stream stream,
+      TableSchema tableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices) {
+    return new BigQueryStorageStreamSource<>(
+        readSession,
+        stream,
+        0L,
+        Long.MAX_VALUE,
+        1L,
+        toJsonString(checkNotNull(tableSchema, "tableSchema")),
+        parseFn,
+        outputCoder,
+        bqServices);
+  }
+
+  private final ReadSession readSession;
+  private final Stream stream;
+  private final String jsonTableSchema;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+
+  private BigQueryStorageStreamSource(
+      ReadSession readSession,
+      Stream stream,
+      long startOffset,
+      long stopOffset,
+      long minBundleSize,
+      String jsonTableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices) {
+    super(startOffset, stopOffset, minBundleSize);
+    this.readSession = checkNotNull(readSession, "readSession");
+    this.stream = checkNotNull(stream, "stream");
+    this.jsonTableSchema = checkNotNull(jsonTableSchema, "jsonTableSchema");
+    this.parseFn = checkNotNull(parseFn, "parseFn");
+    this.outputCoder = checkNotNull(outputCoder, "outputCoder");
+    this.bqServices = checkNotNull(bqServices, "bqServices");
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .addIfNotNull(
+            DisplayData.item("table", 
BigQueryHelpers.toTableSpec(readSession.getTableReference()))
+                .withLabel("Table"))
+        .add(DisplayData.item("readSession", 
readSession.getName()).withLabel("Read session"))
+        .add(DisplayData.item("stream", stream.getName()).withLabel("Stream"));
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    // The size of stream source can't be estimated due to server-side liquid 
sharding.
+    // TODO: Implement progress reporting.
+    return 0L;
+  }
+
+  @Override
+  public List<? extends OffsetBasedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    // A stream source can't be split without reading from it due to 
server-side liquid sharding.
+    // TODO: Implement dynamic work rebalancing.
+    return ImmutableList.of(this);
+  }
+
+  @Override
+  public long getMaxEndOffset(PipelineOptions options) {
+    // This method should never be called given the overrides above.
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+    // This method should never be called given the overrides above.
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public BigQueryStorageStreamReader<T> createReader(PipelineOptions options) 
throws IOException {
+    return new BigQueryStorageStreamReader<>(this, 
options.as(BigQueryOptions.class));
+  }
+
+  /** A {@link org.apache.beam.sdk.io.Source.Reader} which reads records from 
a stream. */
+  @Experimental(Experimental.Kind.SOURCE_SINK)
+  public static class BigQueryStorageStreamReader<T> extends 
OffsetBasedReader<T> {
+
+    private final DatumReader<GenericRecord> datumReader;
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final StorageClient storageClient;
+    private final TableSchema tableSchema;
+
+    private Iterator<ReadRowsResponse> responseIterator;
+    private BinaryDecoder decoder;
+    private GenericRecord record;
+    private T current;
+    private long currentOffset;
+
+    private BigQueryStorageStreamReader(
+        BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws 
IOException {
+      super(source);
+      this.datumReader =
+          new GenericDatumReader<>(
+              new 
Schema.Parser().parse(source.readSession.getAvroSchema().getSchema()));
+      this.parseFn = source.parseFn;
+      this.storageClient = source.bqServices.getStorageClient(options);
+      this.tableSchema = fromJsonString(source.jsonTableSchema, 
TableSchema.class);
+    }
+
+    @Override
+    protected boolean startImpl() throws IOException {
+      BigQueryStorageStreamSource<T> source = getCurrentSource();
+      currentOffset = source.getStartOffset();
+
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setReadPosition(
+                  
StreamPosition.newBuilder().setStream(source.stream).setOffset(currentOffset))
+              .build();
+
+      responseIterator = storageClient.readRows(request).iterator();
+      return readNextRecord();
+    }
+
+    @Override
+    protected boolean advanceImpl() throws IOException {
+      currentOffset++;
+      return readNextRecord();
+    }
+
+    private boolean readNextRecord() throws IOException {
+      while (decoder == null || decoder.isEnd()) {
+        if (!responseIterator.hasNext()) {
+          return false;
+        }
+
+        ReadRowsResponse nextResponse = responseIterator.next();
+
+        decoder =
+            DecoderFactory.get()
+                .binaryDecoder(
+                    
nextResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), decoder);
+      }
+
+      record = datumReader.read(record, decoder);
+      current = parseFn.apply(new SchemaAndRecord(record, tableSchema));
+      return true;
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      return current;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      return currentOffset;
+    }
+
+    @Override
+    public void close() {
+      storageClient.close();
+    }
+
+    @Override
+    public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
+      return (BigQueryStorageStreamSource<T>) super.getCurrentSource();
+    }
+
+    @Override
+    public boolean allowsDynamicSplitting() {
+      return false;
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
new file mode 100644
index 0000000..bdd9037
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
+import 
com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream;
+import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableRefProto;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link org.apache.beam.sdk.io.Source} representing reading from a table. 
*/
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class BigQueryStorageTableSource<T> extends BoundedSource<T> {
+
+  /**
+   * The maximum number of streams which will be requested when creating a 
read session, regardless
+   * of the desired bundle size.
+   */
+  private static final int MAX_SPLIT_COUNT = 10_000;
+
+  /**
+   * The minimum number of streams which will be requested when creating a 
read session, regardless
+   * of the desired bundle size. Note that the server may still choose to 
return fewer than ten
+   * streams based on the layout of the table.
+   */
+  private static final int MIN_SPLIT_COUNT = 10;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryStorageTableSource.class);
+
+  public static <T> BigQueryStorageTableSource<T> create(
+      ValueProvider<TableReference> tableRefProvider,
+      @Nullable TableReadOptions readOptions,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices) {
+    return new BigQueryStorageTableSource<>(
+        NestedValueProvider.of(
+            checkNotNull(tableRefProvider, "tableRefProvider"), new 
TableRefToTableRefProto()),
+        readOptions,
+        parseFn,
+        outputCoder,
+        bqServices);
+  }
+
+  private final ValueProvider<TableReferenceProto.TableReference> 
tableRefProtoProvider;
+  private final TableReadOptions readOptions;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+  private final AtomicReference<Long> tableSizeBytes;
+
+  private BigQueryStorageTableSource(
+      ValueProvider<TableReferenceProto.TableReference> tableRefProtoProvider,
+      @Nullable TableReadOptions readOptions,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices) {
+    this.tableRefProtoProvider = checkNotNull(tableRefProtoProvider, 
"tableRefProtoProvider");
+    this.readOptions = readOptions;
+    this.parseFn = checkNotNull(parseFn, "parseFn");
+    this.outputCoder = checkNotNull(outputCoder, "outputCoder");
+    this.bqServices = checkNotNull(bqServices, "bqServices");
+    this.tableSizeBytes = new AtomicReference<>();
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder.addIfNotNull(
+        DisplayData.item("table", 
BigQueryHelpers.displayTableRefProto(tableRefProtoProvider))
+            .withLabel("Table"));
+  }
+
+  private TableReferenceProto.TableReference getTargetTable(BigQueryOptions 
bqOptions)
+      throws IOException {
+    TableReferenceProto.TableReference tableReferenceProto = 
tableRefProtoProvider.get();
+    return setDefaultProjectIfAbsent(bqOptions, tableReferenceProto);
+  }
+
+  private TableReferenceProto.TableReference setDefaultProjectIfAbsent(
+      BigQueryOptions bqOptions, TableReferenceProto.TableReference 
tableReferenceProto) {
+    if (Strings.isNullOrEmpty(tableReferenceProto.getProjectId())) {
+      checkState(
+          !Strings.isNullOrEmpty(bqOptions.getProject()),
+          "No project ID set in %s or %s, cannot construct a complete %s",
+          TableReferenceProto.TableReference.class.getSimpleName(),
+          BigQueryOptions.class.getSimpleName(),
+          TableReferenceProto.TableReference.class.getSimpleName());
+      LOG.info(
+          "Project ID not set in {}. Using default project from {}.",
+          TableReferenceProto.TableReference.class.getSimpleName(),
+          BigQueryOptions.class.getSimpleName());
+      tableReferenceProto =
+          
tableReferenceProto.toBuilder().setProjectId(bqOptions.getProject()).build();
+    }
+    return tableReferenceProto;
+  }
+
+  private List<String> getSelectedFields() {
+    if (readOptions != null && !readOptions.getSelectedFieldsList().isEmpty()) 
{
+      return readOptions.getSelectedFieldsList();
+    }
+    return null;
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+    if (tableSizeBytes.get() == null) {
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+      TableReferenceProto.TableReference tableReferenceProto =
+          setDefaultProjectIfAbsent(bqOptions, tableRefProtoProvider.get());
+      TableReference tableReference = 
BigQueryHelpers.toTableRef(tableReferenceProto);
+      Table table =
+          bqServices.getDatasetService(bqOptions).getTable(tableReference, 
getSelectedFields());
+      tableSizeBytes.compareAndSet(null, table.getNumBytes());
+    }
+    return tableSizeBytes.get();
+  }
+
+  @Override
+  public List<BigQueryStorageStreamSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    TableReferenceProto.TableReference tableReferenceProto =
+        setDefaultProjectIfAbsent(bqOptions, tableRefProtoProvider.get());
+    TableReference tableReference = 
BigQueryHelpers.toTableRef(tableReferenceProto);
+    Table table =
+        bqServices.getDatasetService(bqOptions).getTable(tableReference, 
getSelectedFields());
+    long tableSizeBytes = (table != null) ? table.getNumBytes() : 0;
+
+    int streamCount = 0;
+    if (desiredBundleSizeBytes > 0) {
+      streamCount = (int) Math.min(tableSizeBytes / desiredBundleSizeBytes, 
MAX_SPLIT_COUNT);
+    }
+
+    CreateReadSessionRequest.Builder requestBuilder =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/" + bqOptions.getProject())
+            .setTableReference(tableReferenceProto)
+            .setRequestedStreams(Math.max(streamCount, MIN_SPLIT_COUNT));
+
+    if (readOptions != null) {
+      requestBuilder.setReadOptions(readOptions);
+    }
+
+    ReadSession readSession;
+    try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
+      readSession = client.createReadSession(requestBuilder.build());
+    }
+
+    if (readSession.getStreamsList().isEmpty()) {
+      // The underlying table is empty or has no rows which can be read.
+      return ImmutableList.of();
+    }
+
+    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+    for (Stream stream : readSession.getStreamsList()) {
+      sources.add(
+          BigQueryStorageStreamSource.create(
+              readSession, stream, table.getSchema(), parseFn, outputCoder, 
bqServices));
+    }
+
+    return ImmutableList.copyOf(sources);
+  }
+
+  @Override
+  public BoundedReader<T> createReader(PipelineOptions options) throws 
IOException {
+    throw new UnsupportedOperationException("BigQuery table source must be 
split before reading");
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index 9866be9..fa4ca25 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -63,6 +63,7 @@ public class GcpApiSurfaceTest {
             classesInPackage("com.google.auth"),
             classesInPackage("com.google.bigtable.admin.v2"),
             classesInPackage("com.google.bigtable.v2"),
+            classesInPackage("com.google.cloud.bigquery.storage.v1beta1"),
             classesInPackage("com.google.cloud.bigtable.config"),
             classesInPackage("com.google.cloud.bigtable.data"),
             classesInPackage("com.google.spanner.v1"),
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java
new file mode 100644
index 0000000..24e7bcd
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link BigQueryIO#read(SerializableFunction)} using 
{@link
+ * Method#DIRECT_READ}. This test reads from a pre-defined table and asserts 
that the number of
+ * records read is equal to the expected count.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageReadIT {
+
+  private static final Map<String, Long> EXPECTED_NUM_RECORDS =
+      ImmutableMap.<String, Long>of(
+          "empty", 0L,
+          "1M", 10592L,
+          "1G", 11110839L,
+          "1T", 11110839000L);
+
+  private static final String DATASET_ID = "big_query_import_export";
+  private static final String TABLE_PREFIX = "parallel_read_";
+
+  private BigQueryIOStorageReadOptions options;
+
+  /** Customized {@link TestPipelineOptions} for BigQueryIOStorageRead 
pipelines. */
+  public interface BigQueryIOStorageReadOptions extends TestPipelineOptions, 
ExperimentalOptions {
+    @Description("The table to be read")
+    @Validation.Required
+    String getInputTable();
+
+    void setInputTable(String table);
+
+    @Description("The expected number of records")
+    @Validation.Required
+    long getNumRecords();
+
+    void setNumRecords(long numRecords);
+  }
+
+  private void setUpTestEnvironment(String tableSize) {
+    PipelineOptionsFactory.register(BigQueryIOStorageReadOptions.class);
+    options = 
TestPipeline.testingPipelineOptions().as(BigQueryIOStorageReadOptions.class);
+    options.setNumRecords(EXPECTED_NUM_RECORDS.get(tableSize));
+    String project = 
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+    options.setInputTable(project + ":" + DATASET_ID + "." + TABLE_PREFIX + 
tableSize);
+  }
+
+  private void runBigQueryIOStorageReadPipeline() {
+    Pipeline p = Pipeline.create(options);
+    PCollection<Long> count =
+        p.apply(
+                "Read",
+                BigQueryIO.read(TableRowParser.INSTANCE)
+                    .from(options.getInputTable())
+                    .withMethod(Method.DIRECT_READ))
+            .apply("Count", Count.globally());
+    PAssert.thatSingleton(count).isEqualTo(options.getNumRecords());
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testBigQueryStorageReadEmpty() throws Exception {
+    setUpTestEnvironment("empty");
+    runBigQueryIOStorageReadPipeline();
+  }
+
+  @Test
+  public void testBigQueryStorageRead1M() throws Exception {
+    setUpTestEnvironment("1M");
+    runBigQueryIOStorageReadPipeline();
+  }
+
+  @Test
+  public void testBigQueryStorageRead1G() throws Exception {
+    setUpTestEnvironment("1G");
+    runBigQueryIOStorageReadPipeline();
+  }
+
+  @Test
+  public void testBigqueryStorageRead1T() throws Exception {
+    setUpTestEnvironment("1T");
+    runBigQueryIOStorageReadPipeline();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java
new file mode 100644
index 0000000..734c3af
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link BigQueryIO#readTableRows()} using {@link 
Method#DIRECT_READ} in
+ * combination with {@link TableRowParser} to generate output in {@link 
TableRow} form.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageReadTableRowIT {
+
+  private static final String DATASET_ID = "big_query_import_export";
+  private static final String TABLE_PREFIX = "parallel_read_table_row_";
+
+  private BigQueryIOStorageReadTableRowOptions options;
+
+  /** Private pipeline options for the test. */
+  public interface BigQueryIOStorageReadTableRowOptions
+      extends TestPipelineOptions, ExperimentalOptions {
+    @Description("The table to be read")
+    @Validation.Required
+    String getInputTable();
+
+    void setInputTable(String table);
+  }
+
+  private static class TableRowToKVPairFn extends SimpleFunction<TableRow, 
KV<String, String>> {
+    @Override
+    public KV<String, String> apply(TableRow input) {
+      CharSequence sampleString = (CharSequence) input.get("sample_string");
+      String key = sampleString != null ? sampleString.toString() : "null";
+      return KV.of(key, BigQueryHelpers.toJsonString(input));
+    }
+  }
+
+  private void setUpTestEnvironment(String tableName) {
+    
PipelineOptionsFactory.register(BigQueryIOStorageReadTableRowOptions.class);
+    options = 
TestPipeline.testingPipelineOptions().as(BigQueryIOStorageReadTableRowOptions.class);
+    String project = 
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+    options.setInputTable(project + ":" + DATASET_ID + "." + TABLE_PREFIX + 
tableName);
+    options.setTempLocation(options.getTempRoot() + "/temp-it/");
+  }
+
+  private static void runPipeline(BigQueryIOStorageReadTableRowOptions 
pipelineOptions) {
+    Pipeline pipeline = Pipeline.create(pipelineOptions);
+
+    PCollection<KV<String, String>> jsonTableRowsFromExport =
+        pipeline
+            .apply(
+                "ExportTable",
+                BigQueryIO.readTableRows()
+                    .from(pipelineOptions.getInputTable())
+                    .withMethod(Method.EXPORT))
+            .apply("MapExportedRows", MapElements.via(new 
TableRowToKVPairFn()));
+
+    PCollection<KV<String, String>> jsonTableRowsFromDirectRead =
+        pipeline
+            .apply(
+                "DirectReadTable",
+                BigQueryIO.readTableRows()
+                    .from(pipelineOptions.getInputTable())
+                    .withMethod(Method.DIRECT_READ))
+            .apply("MapDirectReadRows", MapElements.via(new 
TableRowToKVPairFn()));
+
+    final TupleTag<String> exportTag = new TupleTag<>();
+    final TupleTag<String> directReadTag = new TupleTag<>();
+
+    PCollection<KV<String, Set<String>>> unmatchedRows =
+        KeyedPCollectionTuple.of(exportTag, jsonTableRowsFromExport)
+            .and(directReadTag, jsonTableRowsFromDirectRead)
+            .apply(CoGroupByKey.create())
+            .apply(
+                ParDo.of(
+                    new DoFn<KV<String, CoGbkResult>, KV<String, 
Set<String>>>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) throws 
Exception {
+                        KV<String, CoGbkResult> element = c.element();
+
+                        // Add all the exported rows for the key to a 
collection.
+                        Set<String> uniqueRows = new HashSet<>();
+                        for (String row : 
element.getValue().getAll(exportTag)) {
+                          uniqueRows.add(row);
+                        }
+
+                        // Compute the disjunctive union of the rows in the 
direct read collection.
+                        for (String row : 
element.getValue().getAll(directReadTag)) {
+                          if (uniqueRows.contains(row)) {
+                            uniqueRows.remove(row);
+                          } else {
+                            uniqueRows.add(row);
+                          }
+                        }
+
+                        // Emit any rows in the result set.
+                        if (!uniqueRows.isEmpty()) {
+                          c.output(KV.of(element.getKey(), uniqueRows));
+                        }
+                      }
+                    }));
+
+    PAssert.that(unmatchedRows).empty();
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testBigQueryStorageReadTableRow1() throws Exception {
+    setUpTestEnvironment("1");
+    runPipeline(options);
+  }
+
+  @Test
+  public void testBigQueryStorageReadTableRow10k() throws Exception {
+    setUpTestEnvironment("10k");
+    runPipeline(options);
+  }
+
+  @Test
+  public void testBigQueryStorageReadTableRow100k() throws Exception {
+    setUpTestEnvironment("100k");
+    runPipeline(options);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
new file mode 100644
index 0000000..2762329
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -0,0 +1,825 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import com.google.api.services.bigquery.model.Streamingbuffer;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1beta1.AvroProto.AvroRows;
+import com.google.cloud.bigquery.storage.v1beta1.AvroProto.AvroSchema;
+import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
+import 
com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.model.Statement;
+
+/** Tests for {@link BigQueryIO#readTableRows() using {@link 
Method#DIRECT_READ}}. */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageReadTest {
+  private transient PipelineOptions options;
+  private transient TemporaryFolder testFolder = new TemporaryFolder();
+  private transient TestPipeline p;
+
+  @Rule
+  public final transient TestRule folderThenPipeline =
+      new TestRule() {
+        @Override
+        public Statement apply(Statement base, Description description) {
+          // We need to set up the temporary folder, and then set up the 
TestPipeline based on the
+          // chosen folder. Unfortunately, since rule evaluation order is 
unspecified and unrelated
+          // to field order, and is separate from construction, that requires 
manually creating this
+          // TestRule.
+          Statement withPipeline =
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  options = TestPipeline.testingPipelineOptions();
+                  options.as(BigQueryOptions.class).setProject("project-id");
+                  options
+                      .as(BigQueryOptions.class)
+                      .setTempLocation(testFolder.getRoot().getAbsolutePath());
+                  p = TestPipeline.fromOptions(options);
+                  p.apply(base, description).evaluate();
+                }
+              };
+          return testFolder.apply(withPipeline, description);
+        }
+      };
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  private FakeDatasetService fakeDatasetService = new FakeDatasetService();
+
+  @Before
+  public void setUp() throws Exception {
+    FakeDatasetService.setUp();
+  }
+
+  @Test
+  public void testBuildTableBasedSource() {
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table");
+    checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", 
"table");
+    assertTrue(typedRead.getValidate());
+  }
+
+  @Test
+  public void testBuildTableBasedSourceWithoutValidation() {
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .withoutValidation();
+    checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", 
"table");
+    assertFalse(typedRead.getValidate());
+  }
+
+  @Test
+  public void testBuildTableBasedSourceWithDefaultProject() {
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("myDataset.myTable");
+    checkTypedReadTableObject(typedRead, null, "myDataset", "myTable");
+  }
+
+  @Test
+  public void testBuildTableBasedSourceWithReadOptions() {
+    TableReadOptions readOptions =
+        TableReadOptions.newBuilder()
+            .addSelectedFields("field1")
+            .addSelectedFields("field2")
+            .setRowRestriction("int_field > 5")
+            .build();
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .withReadOptions(readOptions);
+    checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", 
"table");
+    assertEquals(typedRead.getReadOptions(), readOptions);
+  }
+
+  @Test
+  public void testBuildTableBasedSourceWithTableReference() {
+    TableReference tableReference =
+        new TableReference()
+            .setProjectId("foo.com:project")
+            .setDatasetId("dataset")
+            .setTableId("table");
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from(tableReference);
+    checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", 
"table");
+  }
+
+  private void checkTypedReadTableObject(
+      TypedRead typedRead, String project, String dataset, String table) {
+    assertEquals(project, typedRead.getTable().getProjectId());
+    assertEquals(dataset, typedRead.getTable().getDatasetId());
+    assertEquals(table, typedRead.getTable().getTableId());
+    assertNull(typedRead.getQuery());
+    assertEquals(Method.DIRECT_READ, typedRead.getMethod());
+  }
+
+  @Test
+  public void testBuildSourceWithTableAndFlatten() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Invalid BigQueryIO.Read: Specifies a table with a result flattening 
preference,"
+            + " which only applies to queries");
+    p.apply(
+        "ReadMyTable",
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .withoutResultFlattening());
+    p.run();
+  }
+
+  @Test
+  public void testBuildSourceWithTableAndSqlDialect() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect 
preference,"
+            + " which only applies to queries");
+    p.apply(
+        "ReadMyTable",
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .usingStandardSql());
+    p.run();
+  }
+
+  @Test
+  public void testDisplayData() {
+    String tableSpec = "foo.com:project:dataset.table";
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from(tableSpec);
+    DisplayData displayData = DisplayData.from(typedRead);
+    assertThat(displayData, hasDisplayItem("table", tableSpec));
+  }
+
+  @Test
+  public void testEvaluatedDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table");
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(typedRead);
+    assertThat(displayData, hasItem(hasDisplayItem("table")));
+  }
+
+  @Test
+  public void testName() {
+    assertEquals(
+        "BigQueryIO.TypedRead",
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .getName());
+  }
+
+  @Test
+  public void testCoderInference() {
+    // Lambdas erase too much type information -- use an anonymous class here.
+    SerializableFunction<SchemaAndRecord, KV<ByteString, ReadSession>> parseFn 
=
+        new SerializableFunction<SchemaAndRecord, KV<ByteString, 
ReadSession>>() {
+          @Override
+          public KV<ByteString, ReadSession> apply(SchemaAndRecord input) {
+            return null;
+          }
+        };
+
+    assertEquals(
+        KvCoder.of(ByteStringCoder.of(), ProtoCoder.of(ReadSession.class)),
+        BigQueryIO.read(parseFn).inferCoder(CoderRegistry.createDefault()));
+  }
+
+  @Test
+  public void testTableSourceEstimatedSize() throws Exception {
+    doTableSourceEstimatedSizeTest(false);
+  }
+
+  @Test
+  public void testTableSourceEstimatedSize_IgnoresStreamingBuffer() throws 
Exception {
+    doTableSourceEstimatedSizeTest(true);
+  }
+
+  private void doTableSourceEstimatedSizeTest(boolean useStreamingBuffer) 
throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(100L);
+    if (useStreamingBuffer) {
+      table.setStreamingBuffer(new 
Streamingbuffer().setEstimatedBytes(BigInteger.TEN));
+    }
+
+    fakeDatasetService.createTable(table);
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withDatasetService(fakeDatasetService));
+
+    assertEquals(100, tableSource.getEstimatedSizeBytes(options));
+  }
+
+  @Test
+  public void testTableSourceEstimatedSize_WithDefaultProject() throws 
Exception {
+    fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("project-id:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(100L);
+    fakeDatasetService.createTable(table);
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            
ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")),
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withDatasetService(fakeDatasetService));
+
+    assertEquals(100, tableSource.getEstimatedSizeBytes(options));
+  }
+
+  @Test
+  public void testTableSourceInitialSplit() throws Exception {
+    doTableSourceInitialSplitTest(1024L, 1024);
+  }
+
+  @Test
+  public void testTableSourceInitialSplit_MinSplitCount() throws Exception {
+    doTableSourceInitialSplitTest(1024L * 1024L, 10);
+  }
+
+  @Test
+  public void testTableSourceInitialSplit_MaxSplitCount() throws Exception {
+    doTableSourceInitialSplitTest(10L, 10_000);
+  }
+
+  private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) 
throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+
+    Table table =
+        new Table()
+            .setTableReference(tableRef)
+            .setNumBytes(1024L * 1024L)
+            .setSchema(new TableSchema());
+
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
+            .setRequestedStreams(streamCount)
+            .build();
+
+    ReadSession.Builder builder = ReadSession.newBuilder();
+    for (int i = 0; i < streamCount; i++) {
+      builder.addStreams(Stream.newBuilder().setName("stream-" + i));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient));
+
+    List<? extends BoundedSource<TableRow>> sources = 
tableSource.split(bundleSize, options);
+    assertEquals(streamCount, sources.size());
+  }
+
+  @Test
+  public void testTableSourceInitialSplit_WithTableReadOptions() throws 
Throwable {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+
+    Table table =
+        new Table()
+            .setTableReference(tableRef)
+            .setNumBytes(100L)
+            .setSchema(
+                new TableSchema()
+                    .setFields(
+                        ImmutableList.of(
+                            new 
TableFieldSchema().setName("name").setType("STRING"),
+                            new 
TableFieldSchema().setName("number").setType("INTEGER"))));
+
+    fakeDatasetService.createTable(table);
+
+    TableReadOptions readOptions =
+        TableReadOptions.newBuilder()
+            .addSelectedFields("name")
+            .addSelectedFields("number")
+            .setRowRestriction("number > 5")
+            .build();
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
+            .setRequestedStreams(10)
+            .setReadOptions(readOptions)
+            .build();
+
+    ReadSession.Builder builder = ReadSession.newBuilder();
+    for (int i = 0; i < 10; i++) {
+      builder.addStreams(Stream.newBuilder().setName("stream-" + i));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            readOptions,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient));
+
+    List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L, 
options);
+    assertEquals(10L, sources.size());
+  }
+
+  @Test
+  public void testTableSourceInitialSplit_WithDefaultProject() throws 
Exception {
+    fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("project-id:dataset.table");
+
+    Table table =
+        new Table()
+            .setTableReference(tableRef)
+            .setNumBytes(1024L * 1024L)
+            .setSchema(new TableSchema());
+
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
+            .setRequestedStreams(1024)
+            .build();
+
+    ReadSession.Builder builder = ReadSession.newBuilder();
+    for (int i = 0; i < 50; i++) {
+      builder.addStreams(Stream.newBuilder().setName("stream-" + i));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            
ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")),
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient));
+
+    List<? extends BoundedSource<TableRow>> sources = tableSource.split(1024L, 
options);
+    assertEquals(50L, sources.size());
+  }
+
+  @Test
+  public void testTableSourceInitialSplit_EmptyTable() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+
+    Table table =
+        new Table()
+            .setTableReference(tableRef)
+            .setNumBytes(1024L * 1024L)
+            .setSchema(new TableSchema());
+
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
+            .setRequestedStreams(1024)
+            .build();
+
+    ReadSession emptyReadSession = ReadSession.newBuilder().build();
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(emptyReadSession);
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient));
+
+    List<? extends BoundedSource<TableRow>> sources = tableSource.split(1024L, 
options);
+    assertTrue(sources.isEmpty());
+  }
+
+  @Test
+  public void testTableSourceCreateReader() throws Exception {
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(
+                
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table")),
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withDatasetService(fakeDatasetService));
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("BigQuery table source must be split before reading");
+    tableSource.createReader(options);
+  }
+
+  private static final String AVRO_SCHEMA_STRING =
+      "{\"namespace\": \"example.avro\",\n"
+          + " \"type\": \"record\",\n"
+          + " \"name\": \"RowRecord\",\n"
+          + " \"fields\": [\n"
+          + "     {\"name\": \"name\", \"type\": \"string\"},\n"
+          + "     {\"name\": \"number\", \"type\": \"long\"}\n"
+          + " ]\n"
+          + "}";
+
+  private static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(AVRO_SCHEMA_STRING);
+
+  private static final TableSchema TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.of(
+                  new 
TableFieldSchema().setName("name").setType("STRING").setMode("REQUIRED"),
+                  new 
TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED")));
+
+  private static GenericRecord createRecord(String name, long number, Schema 
schema) {
+    GenericRecord genericRecord = new Record(schema);
+    genericRecord.put("name", name);
+    genericRecord.put("number", number);
+    return genericRecord;
+  }
+
+  private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
+
+  private static ReadRowsResponse createResponse(
+      Schema schema, Collection<GenericRecord> genericRecords) throws 
Exception {
+    GenericDatumWriter<GenericRecord> writer = new 
GenericDatumWriter<>(schema);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    Encoder binaryEncoder = ENCODER_FACTORY.binaryEncoder(outputStream, null);
+    for (GenericRecord genericRecord : genericRecords) {
+      writer.write(genericRecord, binaryEncoder);
+    }
+
+    binaryEncoder.flush();
+
+    return ReadRowsResponse.newBuilder()
+        .setAvroRows(
+            AvroRows.newBuilder()
+                
.setSerializedBinaryRows(ByteString.copyFrom(outputStream.toByteArray()))
+                .setRowCount(genericRecords.size()))
+        .build();
+  }
+
+  @Test
+  public void testStreamSourceEstimatedSizeBytes() throws Exception {
+
+    BigQueryStorageStreamSource<TableRow> streamSource =
+        BigQueryStorageStreamSource.create(
+            ReadSession.getDefaultInstance(),
+            Stream.getDefaultInstance(),
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices());
+
+    assertEquals(0, streamSource.getEstimatedSizeBytes(options));
+  }
+
+  @Test
+  public void testStreamSourceSplit() throws Exception {
+
+    BigQueryStorageStreamSource<TableRow> streamSource =
+        BigQueryStorageStreamSource.create(
+            ReadSession.getDefaultInstance(),
+            Stream.getDefaultInstance(),
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices());
+
+    assertThat(streamSource.split(0, options), 
containsInAnyOrder(streamSource));
+  }
+
+  @Test
+  public void testStreamSourceGetMaxEndOffset() throws Exception {
+
+    BigQueryStorageStreamSource<TableRow> streamSource =
+        BigQueryStorageStreamSource.create(
+            ReadSession.getDefaultInstance(),
+            Stream.getDefaultInstance(),
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Not implemented");
+    streamSource.getMaxEndOffset(options);
+  }
+
+  @Test
+  public void testStreamSourceCreateSouceForSubrange() throws Exception {
+
+    BigQueryStorageStreamSource<TableRow> streamSource =
+        BigQueryStorageStreamSource.create(
+            ReadSession.getDefaultInstance(),
+            Stream.getDefaultInstance(),
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Not implemented");
+    streamSource.createSourceForSubrange(0, 0);
+  }
+
+  @Test
+  public void testReadFromStreamSource() throws Exception {
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSession")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .build();
+
+    Stream stream = Stream.newBuilder().setName("stream").build();
+
+    ReadRowsRequest expectedRequest =
+        ReadRowsRequest.newBuilder()
+            .setReadPosition(StreamPosition.newBuilder().setStream(stream))
+            .build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", 1, AVRO_SCHEMA),
+            createRecord("B", 2, AVRO_SCHEMA),
+            createRecord("C", 3, AVRO_SCHEMA));
+
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(0, 2)),
+            createResponse(AVRO_SCHEMA, records.subList(2, 3)));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(expectedRequest)).thenReturn(responses);
+
+    BigQueryStorageStreamSource<TableRow> streamSource =
+        BigQueryStorageStreamSource.create(
+            readSession,
+            stream,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient));
+
+    List<TableRow> rows = new ArrayList<>();
+    BoundedReader<TableRow> reader = streamSource.createReader(options);
+    for (boolean hasNext = reader.start(); hasNext; hasNext = 
reader.advance()) {
+      rows.add(reader.getCurrent());
+    }
+
+    System.out.println("Rows: " + rows);
+
+    assertEquals(3, rows.size());
+  }
+
+  @Test
+  public void testStreamSourceSplitAtFraction() throws Exception {
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSession")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .build();
+
+    Stream stream = Stream.newBuilder().setName("stream").build();
+
+    ReadRowsRequest expectedRequest =
+        ReadRowsRequest.newBuilder()
+            .setReadPosition(StreamPosition.newBuilder().setStream(stream))
+            .build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", 1, AVRO_SCHEMA),
+            createRecord("B", 2, AVRO_SCHEMA),
+            createRecord("C", 3, AVRO_SCHEMA));
+
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(0, 2)),
+            createResponse(AVRO_SCHEMA, records.subList(2, 3)));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(expectedRequest)).thenReturn(responses);
+
+    BigQueryStorageStreamSource<TableRow> streamSource =
+        BigQueryStorageStreamSource.create(
+            readSession,
+            stream,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient));
+
+    BoundedReader<TableRow> reader = streamSource.createReader(options);
+    reader.start();
+    assertNull(reader.splitAtFraction(0.5));
+  }
+
+  private static final class ParseKeyValue
+      implements SerializableFunction<SchemaAndRecord, KV<String, Long>> {
+    @Override
+    public KV<String, Long> apply(SchemaAndRecord input) {
+      return KV.of(
+          input.getRecord().get("name").toString(), (Long) 
input.getRecord().get("number"));
+    }
+  }
+
+  @Test
+  public void testReadFromBigQueryIO() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+
+    Table table =
+        new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(new 
TableSchema());
+
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
+            .setRequestedStreams(10)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .addStreams(Stream.newBuilder().setName("streamName"))
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequest =
+        ReadRowsRequest.newBuilder()
+            .setReadPosition(
+                
StreamPosition.newBuilder().setStream(Stream.newBuilder().setName("streamName")))
+            .build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", 1, AVRO_SCHEMA),
+            createRecord("B", 2, AVRO_SCHEMA),
+            createRecord("C", 3, AVRO_SCHEMA),
+            createRecord("D", 4, AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponses =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(0, 2)),
+            createResponse(AVRO_SCHEMA, records.subList(2, 4)));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, 
withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    
when(fakeStorageClient.readRows(expectedReadRowsRequest)).thenReturn(readRowsResponses);
+
+    PCollection<KV<String, Long>> output =
+        p.apply(
+            BigQueryIO.read(new ParseKeyValue())
+                .from("foo.com:project:dataset.table")
+                .withMethod(Method.DIRECT_READ)
+                .withTestServices(
+                    new FakeBigQueryServices()
+                        .withDatasetService(fakeDatasetService)
+                        .withStorageClient(fakeStorageClient)));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(KV.of("A", 1L), KV.of("B", 2L), KV.of("C", 3L), 
KV.of("D", 4L)));
+
+    p.run();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index d09745d..ec36475 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -339,7 +339,7 @@ public class BigQueryServicesImplTest {
     BigQueryServicesImpl.DatasetServiceImpl datasetService =
         new BigQueryServicesImpl.DatasetServiceImpl(bigquery, 
PipelineOptionsFactory.create());
 
-    Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, 
Sleeper.DEFAULT);
+    Table table = datasetService.getTable(tableRef, null, 
BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
 
     assertEquals(testTable, table);
     verify(response, times(2)).getStatusCode();
@@ -360,7 +360,7 @@ public class BigQueryServicesImplTest {
             .setProjectId("projectId")
             .setDatasetId("datasetId")
             .setTableId("tableId");
-    Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, 
Sleeper.DEFAULT);
+    Table table = datasetService.getTable(tableRef, null, 
BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
 
     assertNull(table);
     verify(response, times(1)).getStatusCode();
@@ -384,7 +384,7 @@ public class BigQueryServicesImplTest {
 
     BigQueryServicesImpl.DatasetServiceImpl datasetService =
         new BigQueryServicesImpl.DatasetServiceImpl(bigquery, 
PipelineOptionsFactory.create());
-    datasetService.getTable(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
+    datasetService.getTable(tableRef, null, BackOff.STOP_BACKOFF, 
Sleeper.DEFAULT);
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
index 77dc8c7..1e60c1e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -31,7 +31,8 @@ import org.apache.beam.sdk.coders.ListCoder;
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class FakeBigQueryServices implements BigQueryServices {
   private JobService jobService;
-  private FakeDatasetService datasetService;
+  private DatasetService datasetService;
+  private StorageClient storageClient;
 
   public FakeBigQueryServices withJobService(JobService jobService) {
     this.jobService = jobService;
@@ -43,6 +44,11 @@ public class FakeBigQueryServices implements 
BigQueryServices {
     return this;
   }
 
+  public FakeBigQueryServices withStorageClient(StorageClient storageClient) {
+    this.storageClient = storageClient;
+    return this;
+  }
+
   @Override
   public JobService getJobService(BigQueryOptions bqOptions) {
     return jobService;
@@ -53,6 +59,11 @@ public class FakeBigQueryServices implements 
BigQueryServices {
     return datasetService;
   }
 
+  @Override
+  public StorageClient getStorageClient(BigQueryOptions bqOptions) {
+    return storageClient;
+  }
+
   static List<TableRow> rowsFromEncodedQuery(String query) throws IOException {
     ListCoder<TableRow> listCoder = ListCoder.of(TableRowJsonCoder.of());
     ByteArrayInputStream input = new 
ByteArrayInputStream(Base64.decodeBase64(query));
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index 78254b7..3edb288 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -61,6 +61,12 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
 
   @Override
   public Table getTable(TableReference tableRef) throws InterruptedException, 
IOException {
+    return getTable(tableRef, null);
+  }
+
+  @Override
+  public Table getTable(TableReference tableRef, @Nullable List<String> 
selectedFields)
+      throws InterruptedException, IOException {
     synchronized (tables) {
       Map<String, TableContainer> dataset =
           tables.get(tableRef.getProjectId(), tableRef.getDatasetId());

Reply via email to