Support BigQuery BYTES type
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c7e0010b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c7e0010b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c7e0010b Branch: refs/heads/master Commit: c7e0010b0d4a3c45148d05f5101f5310bb84c40c Parents: 31a35e5 Author: Pei He <pe...@google.com> Authored: Tue Sep 13 11:08:34 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Sep 21 17:29:03 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 10 ++++++++++ .../sdk/io/gcp/bigquery/BigQueryTableRowIterator.java | 1 + .../sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 14 ++++++++++++-- .../io/gcp/bigquery/BigQueryTableRowIteratorTest.java | 13 ++++++++++--- 4 files changed, 33 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7e0010b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index 7826559..d9b5423 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -26,6 +26,9 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.io.BaseEncoding; + +import java.nio.ByteBuffer; import java.util.List; import javax.annotation.Nullable; import org.apache.avro.Schema; @@ -153,6 +156,7 @@ class BigQueryAvroUtils { ImmutableMap<String, Type> fieldMap = ImmutableMap.<String, Type>builder() .put("STRING", Type.STRING) + .put("BYTES", Type.BYTES) .put("INTEGER", Type.LONG) .put("FLOAT", Type.DOUBLE) .put("BOOLEAN", Type.BOOLEAN) @@ -195,6 +199,12 @@ class BigQueryAvroUtils { case "RECORD": verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass()); return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields()); + case "BYTES": + verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass()); + ByteBuffer byteBuffer = (ByteBuffer) v; + byte[] bytes = new byte[byteBuffer.limit()]; + byteBuffer.get(bytes); + return BaseEncoding.base64().encode(bytes); default: throw new UnsupportedOperationException( String.format( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7e0010b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java index 677c661..420f30c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java @@ -253,6 +253,7 @@ class BigQueryTableRowIterator implements AutoCloseable { return BigQueryAvroUtils.formatTimestamp((String) v); } + // Returns the original value for String and base64 encoded BYTES return v; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7e0010b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index 59cf1f7..b9199b0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -23,6 +23,9 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.Lists; +import com.google.common.io.BaseEncoding; + +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.avro.Schema; @@ -61,6 +64,7 @@ public class BigQueryAvroUtilsTest { new TableFieldSchema().setName("quantity").setType("INTEGER") /* default to NULLABLE */, new TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"), new TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"), + new TableFieldSchema().setName("sound").setType("BYTES").setMode("NULLABLE"), new TableFieldSchema().setName("scion").setType("RECORD").setMode("NULLABLE") .setFields(subFields), new TableFieldSchema().setName("associates").setType("RECORD").setMode("REPEATED") @@ -79,19 +83,24 @@ public class BigQueryAvroUtilsTest { assertEquals(row, convertedRow); } { - // Test type conversion for TIMESTAMP, INTEGER, BOOLEAN, and FLOAT. + // Test type conversion for TIMESTAMP, INTEGER, BOOLEAN, BYTES, and FLOAT. GenericRecord record = new GenericData.Record(avroSchema); + byte[] soundBytes = "chirp,chirp".getBytes(); + ByteBuffer soundByteBuffer = ByteBuffer.allocate(soundBytes.length).put(soundBytes); + soundByteBuffer.rewind(); record.put("number", 5L); record.put("quality", 5.0); record.put("birthday", 5L); record.put("flighted", Boolean.TRUE); + record.put("sound", soundByteBuffer); TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); TableRow row = new TableRow() .set("number", "5") .set("birthday", "1970-01-01 00:00:00.000005 UTC") .set("quality", 5.0) .set("associates", new ArrayList<TableRow>()) - .set("flighted", Boolean.TRUE); + .set("flighted", Boolean.TRUE) + .set("sound", BaseEncoding.base64().encode(soundBytes)); assertEquals(row, convertedRow); } { @@ -123,6 +132,7 @@ public class BigQueryAvroUtilsTest { @Nullable Long quantity; @Nullable Long birthday; // Exercises TIMESTAMP. @Nullable Boolean flighted; + @Nullable ByteBuffer sound; @Nullable SubBird scion; SubBird[] associates; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7e0010b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java index ab848f5..4f0cac9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java @@ -45,6 +45,7 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.io.BaseEncoding; import java.io.IOException; import java.util.Arrays; import java.util.LinkedList; @@ -122,7 +123,8 @@ public class BigQueryTableRowIteratorTest { .setFields( Arrays.asList( new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("answer").setType("INTEGER")))); + new TableFieldSchema().setName("answer").setType("INTEGER"), + new TableFieldSchema().setName("photo").setType("BYTES")))); } private TableRow rawRow(Object... args) { @@ -162,11 +164,14 @@ public class BigQueryTableRowIteratorTest { // Mock table schema fetch. when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema()); + byte[] photoBytes = "photograph".getBytes(); + String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes); // Mock table data fetch. - when(mockTabledataList.execute()).thenReturn(rawDataList(rawRow("Arthur", 42))); + when(mockTabledataList.execute()) + .thenReturn(rawDataList(rawRow("Arthur", 42, photoBytesEncoded))); // Run query and verify - String query = "SELECT name, count from table"; + String query = "SELECT name, count, photoBytes from table"; try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) { iterator.open(); @@ -175,8 +180,10 @@ public class BigQueryTableRowIteratorTest { assertTrue(row.containsKey("name")); assertTrue(row.containsKey("answer")); + assertTrue(row.containsKey("photo")); assertEquals("Arthur", row.get("name")); assertEquals(42, row.get("answer")); + assertEquals(photoBytesEncoded, row.get("photo")); assertFalse(iterator.advance()); }