Support BigQuery BYTES type

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c7e0010b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c7e0010b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c7e0010b

Branch: refs/heads/master
Commit: c7e0010b0d4a3c45148d05f5101f5310bb84c40c
Parents: 31a35e5
Author: Pei He <pe...@google.com>
Authored: Tue Sep 13 11:08:34 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Wed Sep 21 17:29:03 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java   | 10 ++++++++++
 .../sdk/io/gcp/bigquery/BigQueryTableRowIterator.java |  1 +
 .../sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java    | 14 ++++++++++++--
 .../io/gcp/bigquery/BigQueryTableRowIteratorTest.java | 13 ++++++++++---
 4 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7e0010b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index 7826559..d9b5423 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -26,6 +26,9 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.BaseEncoding;
+
+import java.nio.ByteBuffer;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
@@ -153,6 +156,7 @@ class BigQueryAvroUtils {
     ImmutableMap<String, Type> fieldMap =
         ImmutableMap.<String, Type>builder()
             .put("STRING", Type.STRING)
+            .put("BYTES", Type.BYTES)
             .put("INTEGER", Type.LONG)
             .put("FLOAT", Type.DOUBLE)
             .put("BOOLEAN", Type.BOOLEAN)
@@ -195,6 +199,12 @@ class BigQueryAvroUtils {
       case "RECORD":
         verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", 
v.getClass());
         return convertGenericRecordToTableRow((GenericRecord) v, 
fieldSchema.getFields());
+      case "BYTES":
+        verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", 
v.getClass());
+        ByteBuffer byteBuffer = (ByteBuffer) v;
+        byte[] bytes = new byte[byteBuffer.limit()];
+        byteBuffer.get(bytes);
+        return BaseEncoding.base64().encode(bytes);
       default:
         throw new UnsupportedOperationException(
             String.format(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7e0010b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 677c661..420f30c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -253,6 +253,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
       return BigQueryAvroUtils.formatTimestamp((String) v);
     }
 
+    // Returns the original value for String and base64 encoded BYTES
     return v;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7e0010b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index 59cf1f7..b9199b0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -23,6 +23,9 @@ import 
com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.Lists;
+import com.google.common.io.BaseEncoding;
+
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.avro.Schema;
@@ -61,6 +64,7 @@ public class BigQueryAvroUtilsTest {
             new TableFieldSchema().setName("quantity").setType("INTEGER") /* 
default to NULLABLE */,
             new 
TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"),
             new 
TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"),
+            new 
TableFieldSchema().setName("sound").setType("BYTES").setMode("NULLABLE"),
             new 
TableFieldSchema().setName("scion").setType("RECORD").setMode("NULLABLE")
                 .setFields(subFields),
             new 
TableFieldSchema().setName("associates").setType("RECORD").setMode("REPEATED")
@@ -79,19 +83,24 @@ public class BigQueryAvroUtilsTest {
       assertEquals(row, convertedRow);
     }
     {
-      // Test type conversion for TIMESTAMP, INTEGER, BOOLEAN, and FLOAT.
+      // Test type conversion for TIMESTAMP, INTEGER, BOOLEAN, BYTES, and 
FLOAT.
       GenericRecord record = new GenericData.Record(avroSchema);
+      byte[] soundBytes = "chirp,chirp".getBytes();
+      ByteBuffer soundByteBuffer = 
ByteBuffer.allocate(soundBytes.length).put(soundBytes);
+      soundByteBuffer.rewind();
       record.put("number", 5L);
       record.put("quality", 5.0);
       record.put("birthday", 5L);
       record.put("flighted", Boolean.TRUE);
+      record.put("sound", soundByteBuffer);
       TableRow convertedRow = 
BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
       TableRow row = new TableRow()
           .set("number", "5")
           .set("birthday", "1970-01-01 00:00:00.000005 UTC")
           .set("quality", 5.0)
           .set("associates", new ArrayList<TableRow>())
-          .set("flighted", Boolean.TRUE);
+          .set("flighted", Boolean.TRUE)
+          .set("sound", BaseEncoding.base64().encode(soundBytes));
       assertEquals(row, convertedRow);
     }
     {
@@ -123,6 +132,7 @@ public class BigQueryAvroUtilsTest {
     @Nullable Long quantity;
     @Nullable Long birthday;  // Exercises TIMESTAMP.
     @Nullable Boolean flighted;
+    @Nullable ByteBuffer sound;
     @Nullable SubBird scion;
     SubBird[] associates;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7e0010b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
index ab848f5..4f0cac9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
@@ -45,6 +45,7 @@ import 
com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.io.BaseEncoding;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.LinkedList;
@@ -122,7 +123,8 @@ public class BigQueryTableRowIteratorTest {
                 .setFields(
                     Arrays.asList(
                         new 
TableFieldSchema().setName("name").setType("STRING"),
-                        new 
TableFieldSchema().setName("answer").setType("INTEGER"))));
+                        new 
TableFieldSchema().setName("answer").setType("INTEGER"),
+                        new 
TableFieldSchema().setName("photo").setType("BYTES"))));
   }
 
   private TableRow rawRow(Object... args) {
@@ -162,11 +164,14 @@ public class BigQueryTableRowIteratorTest {
     // Mock table schema fetch.
     when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema());
 
+    byte[] photoBytes = "photograph".getBytes();
+    String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes);
     // Mock table data fetch.
-    when(mockTabledataList.execute()).thenReturn(rawDataList(rawRow("Arthur", 
42)));
+    when(mockTabledataList.execute())
+        .thenReturn(rawDataList(rawRow("Arthur", 42, photoBytesEncoded)));
 
     // Run query and verify
-    String query = "SELECT name, count from table";
+    String query = "SELECT name, count, photoBytes from table";
     try (BigQueryTableRowIterator iterator =
             BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null)) {
       iterator.open();
@@ -175,8 +180,10 @@ public class BigQueryTableRowIteratorTest {
 
       assertTrue(row.containsKey("name"));
       assertTrue(row.containsKey("answer"));
+      assertTrue(row.containsKey("photo"));
       assertEquals("Arthur", row.get("name"));
       assertEquals(42, row.get("answer"));
+      assertEquals(photoBytesEncoded, row.get("photo"));
 
       assertFalse(iterator.advance());
     }

Reply via email to