This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 58b4d762eec Merge pull request #17404: [BEAM-13990] support date and timestamp fields 58b4d762eec is described below commit 58b4d762eece66774a5df6ca54e6f91c49057c9b Author: Reuven Lax <re...@google.com> AuthorDate: Fri Apr 29 22:02:56 2022 -0700 Merge pull request #17404: [BEAM-13990] support date and timestamp fields --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 + sdks/java/io/google-cloud-platform/build.gradle | 2 + .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 + .../StorageApiDynamicDestinationsTableRow.java | 13 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 4 + .../bigquery/StorageApiWritesShardedRecords.java | 4 + .../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 8 +- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 339 ++++++++++++----- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 8 - .../gcp/bigquery/TableRowToStorageApiProtoIT.java | 407 +++++++++++++++++++++ .../bigquery/TableRowToStorageApiProtoTest.java | 140 ++++--- .../beam/sdk/io/gcp/spanner/StructUtilsTest.java | 3 +- 12 files changed, 777 insertions(+), 154 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 5d5b1e9377f..0f0c11dc2cb 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -631,6 +631,7 @@ class BeamModulePlugin implements Plugin<Project> { jackson_dataformat_xml : "com.fasterxml.jackson.dataformat:jackson-dataformat-xml:$jackson_version", jackson_dataformat_yaml : "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jackson_version", jackson_datatype_joda : "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jackson_version", + jackson_datatype_jsr310 : "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jackson_version", jackson_module_scala_2_11 : "com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version", jackson_module_scala_2_12 : "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jackson_version", // Swap to use the officially published version of 0.4.x once available diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 8d1d0a1776d..cd71fc35e21 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -116,6 +116,8 @@ dependencies { implementation library.java.http_core implementation library.java.jackson_core implementation library.java.jackson_databind + implementation library.java.jackson_datatype_joda + implementation library.java.jackson_datatype_jsr310 implementation library.java.joda_time implementation library.java.junit implementation library.java.netty_handler diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 54f485a9d71..2a3e595adc1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3085,6 +3085,8 @@ public class BigQueryIO { CreateTables.clearCreatedTables(); TwoLevelMessageConverterCache.clear(); StorageApiDynamicDestinationsTableRow.clearSchemaCache(); + StorageApiWriteUnshardedRecords.clearCache(); + StorageApiWritesShardedRecords.clearCache(); } ///////////////////////////////////////////////////////////////////////////// diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java index c48dbe0dedb..3c9f676cfad 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java @@ -70,6 +70,7 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT> DestinationT destination, DatasetService datasetService) throws Exception { return new MessageConverter<T>() { TableSchema tableSchema; + TableRowToStorageApiProto.SchemaInformation schemaInformation; Descriptor descriptor; long descriptorHash; @@ -103,7 +104,8 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT> MoreObjects.firstNonNull( SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema); } - + schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema); descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema); descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor); } @@ -138,6 +140,8 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT> } synchronized (this) { tableSchema = newSchema; + schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema); descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema); long newHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor); if (descriptorHash != newHash) { @@ -154,16 +158,21 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT> public StorageApiWritePayload toMessage(T element) throws Exception { int attempt = 0; do { + TableRowToStorageApiProto.SchemaInformation localSchemaInformation; Descriptor localDescriptor; long localDescriptorHash; synchronized (this) { + localSchemaInformation = schemaInformation; localDescriptor = descriptor; localDescriptorHash = descriptorHash; } try { Message msg = TableRowToStorageApiProto.messageFromTableRow( - localDescriptor, formatFunction.apply(element), ignoreUnknownValues); + localSchemaInformation, + localDescriptor, + formatFunction.apply(element), + ignoreUnknownValues); return new AutoValue_StorageApiWritePayload(msg.toByteArray(), localDescriptorHash); } catch (SchemaTooNarrowException e) { if (attempt > schemaUpdateRetries) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index f8619d5fd62..1751799dd51 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -87,6 +87,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> }) .build(); + static void clearCache() { + APPEND_CLIENTS.invalidateAll(); + } + // Run a closure asynchronously, ignoring failures. private interface ThrowingRunnable { void run() throws Exception; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 787cb0c2d62..c0f60f34c61 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -119,6 +119,10 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT> }) .build(); + static void clearCache() { + APPEND_CLIENTS.invalidateAll(); + } + // Run a closure asynchronously, ignoring failures. private interface ThrowingRunnable { void run() throws Exception; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java index 7e82eec212d..9b80c0b552b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java @@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.joda.JodaModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.api.services.bigquery.model.TableRow; import java.io.IOException; import java.io.InputStream; @@ -68,7 +70,11 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> { // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in // TableRow. private static final ObjectMapper MAPPER = - new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + new ObjectMapper() + .registerModule(new JavaTimeModule()) + .registerModule(new JodaModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder(); private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {}; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 2f241a434a8..750542b7121 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -22,6 +22,7 @@ import static java.util.stream.Collectors.toList; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; @@ -34,19 +35,28 @@ import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.Descriptors.FileDescriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoUnit; import java.util.AbstractMap; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.function.Function; import javax.annotation.Nullable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding; +import org.joda.time.Days; /** * Utility methods for converting JSON {@link TableRow} objects to dynamic protocol message, for use @@ -71,6 +81,58 @@ public class TableRowToStorageApiProto { } } + static class SchemaInformation { + private final TableFieldSchema tableFieldSchema; + private final List<SchemaInformation> subFields; + private final Map<String, SchemaInformation> subFieldsByName; + + private SchemaInformation(TableFieldSchema tableFieldSchema) { + this.tableFieldSchema = tableFieldSchema; + this.subFields = Lists.newArrayList(); + this.subFieldsByName = Maps.newHashMap(); + if (tableFieldSchema.getFields() != null) { + for (TableFieldSchema field : tableFieldSchema.getFields()) { + SchemaInformation schemaInformation = new SchemaInformation(field); + subFields.add(schemaInformation); + subFieldsByName.put(field.getName(), schemaInformation); + } + } + } + + public String getName() { + return tableFieldSchema.getName(); + } + + public String getType() { + return tableFieldSchema.getType(); + } + + public SchemaInformation getSchemaForField(String name) { + SchemaInformation schemaInformation = subFieldsByName.get(name); + if (schemaInformation == null) { + throw new RuntimeException("Schema field not found: " + name); + } + return schemaInformation; + } + + public SchemaInformation getSchemaForField(int i) { + SchemaInformation schemaInformation = subFields.get(i); + if (schemaInformation == null) { + throw new RuntimeException("Schema field not found: " + i); + } + return schemaInformation; + } + + static SchemaInformation fromTableSchema(TableSchema tableSchema) { + TableFieldSchema rootSchema = + new TableFieldSchema() + .setName("__root__") + .setType("RECORD") + .setFields(tableSchema.getFields()); + return new SchemaInformation(rootSchema); + } + } + static final Map<String, Type> PRIMITIVE_TYPES = ImmutableMap.<String, Type>builder() .put("INT64", Type.TYPE_INT64) @@ -81,13 +143,13 @@ public class TableRowToStorageApiProto { .put("BOOL", Type.TYPE_BOOL) .put("BOOLEAN", Type.TYPE_BOOL) .put("BYTES", Type.TYPE_BYTES) - .put("NUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding. - .put("BIGNUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding. + .put("NUMERIC", Type.TYPE_BYTES) // Pass through the JSON encoding. + .put("BIGNUMERIC", Type.TYPE_BYTES) // Pass through the JSON encoding. .put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON encoding. - .put("DATE", Type.TYPE_STRING) // Pass through the JSON encoding. - .put("TIME", Type.TYPE_STRING) // Pass through the JSON encoding. - .put("DATETIME", Type.TYPE_STRING) // Pass through the JSON encoding. - .put("TIMESTAMP", Type.TYPE_STRING) // Pass through the JSON encoding. + .put("DATE", Type.TYPE_INT32) + .put("TIME", Type.TYPE_INT64) + .put("DATETIME", Type.TYPE_INT64) + .put("TIMESTAMP", Type.TYPE_INT64) .put("JSON", Type.TYPE_STRING) .build(); @@ -107,7 +169,10 @@ public class TableRowToStorageApiProto { } public static DynamicMessage messageFromMap( - Descriptor descriptor, AbstractMap<String, Object> map, boolean ignoreUnknownValues) + SchemaInformation schemaInformation, + Descriptor descriptor, + AbstractMap<String, Object> map, + boolean ignoreUnknownValues) throws SchemaConversionException { DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); for (Map.Entry<String, Object> entry : map.entrySet()) { @@ -121,9 +186,12 @@ public class TableRowToStorageApiProto { "TableRow contained unexpected field with name " + entry.getKey()); } } + SchemaInformation fieldSchemaInformation = + schemaInformation.getSchemaForField(entry.getKey()); @Nullable Object value = - messageValueFromFieldValue(fieldDescriptor, entry.getValue(), ignoreUnknownValues); + messageValueFromFieldValue( + fieldSchemaInformation, fieldDescriptor, entry.getValue(), ignoreUnknownValues); if (value != null) { builder.setField(fieldDescriptor, value); } @@ -136,7 +204,10 @@ public class TableRowToStorageApiProto { * using the BigQuery Storage API. */ public static DynamicMessage messageFromTableRow( - Descriptor descriptor, TableRow tableRow, boolean ignoreUnkownValues) + SchemaInformation schemaInformation, + Descriptor descriptor, + TableRow tableRow, + boolean ignoreUnkownValues) throws SchemaConversionException { @Nullable Object fValue = tableRow.get("f"); if (fValue instanceof List) { @@ -153,9 +224,11 @@ public class TableRowToStorageApiProto { for (int i = 0; i < cellsToProcess; ++i) { AbstractMap<String, Object> cell = cells.get(i); FieldDescriptor fieldDescriptor = descriptor.getFields().get(i); + SchemaInformation fieldSchemaInformation = schemaInformation.getSchemaForField(i); @Nullable Object value = - messageValueFromFieldValue(fieldDescriptor, cell.get("v"), ignoreUnkownValues); + messageValueFromFieldValue( + fieldSchemaInformation, fieldDescriptor, cell.get("v"), ignoreUnkownValues); if (value != null) { builder.setField(fieldDescriptor, value); } @@ -163,7 +236,7 @@ public class TableRowToStorageApiProto { return builder.build(); } else { - return messageFromMap(descriptor, tableRow, ignoreUnkownValues); + return messageFromMap(schemaInformation, descriptor, tableRow, ignoreUnkownValues); } } @@ -218,125 +291,191 @@ public class TableRowToStorageApiProto { } @Nullable + @SuppressWarnings({"nullness"}) private static Object messageValueFromFieldValue( - FieldDescriptor fieldDescriptor, @Nullable Object bqValue, boolean ignoreUnknownValues) + SchemaInformation schemaInformation, + FieldDescriptor fieldDescriptor, + @Nullable Object bqValue, + boolean ignoreUnknownValues) throws SchemaConversionException { if (bqValue == null) { if (fieldDescriptor.isOptional()) { return null; } else if (fieldDescriptor.isRepeated()) { return Collections.emptyList(); - } - { + } else { throw new IllegalArgumentException( "Received null value for non-nullable field " + fieldDescriptor.getName()); } } - return toProtoValue( - fieldDescriptor, bqValue, fieldDescriptor.isRepeated(), ignoreUnknownValues); - } - - private static final Map<FieldDescriptor.Type, Function<String, Object>> - JSON_PROTO_STRING_PARSERS = - ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder() - .put(FieldDescriptor.Type.INT32, Integer::valueOf) - .put(FieldDescriptor.Type.INT64, Long::valueOf) - .put(FieldDescriptor.Type.FLOAT, Float::valueOf) - .put(FieldDescriptor.Type.DOUBLE, Double::valueOf) - .put(FieldDescriptor.Type.BOOL, Boolean::valueOf) - .put(FieldDescriptor.Type.STRING, str -> str) - .put( - FieldDescriptor.Type.BYTES, - b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64))) - .build(); - @Nullable - @SuppressWarnings({"nullness"}) - @VisibleForTesting - static Object toProtoValue( - FieldDescriptor fieldDescriptor, - Object jsonBQValue, - boolean isRepeated, - boolean ignoreUnknownValues) - throws SchemaConversionException { - if (isRepeated) { - List<Object> listValue = (List<Object>) jsonBQValue; + if (fieldDescriptor.isRepeated()) { + List<Object> listValue = (List<Object>) bqValue; List<Object> protoList = Lists.newArrayListWithCapacity(listValue.size()); - for (Object o : listValue) { - protoList.add(toProtoValue(fieldDescriptor, o, false, ignoreUnknownValues)); + for (@Nullable Object o : listValue) { + if (o != null) { // repeated field cannot contain null. + protoList.add( + singularFieldToProtoValue( + schemaInformation, fieldDescriptor, o, ignoreUnknownValues)); + } } return protoList; } - - if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) { - if (jsonBQValue instanceof TableRow) { - TableRow tableRow = (TableRow) jsonBQValue; - return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow, ignoreUnknownValues); - } else if (jsonBQValue instanceof AbstractMap) { - // This will handle nested rows. - AbstractMap<String, Object> map = ((AbstractMap<String, Object>) jsonBQValue); - return messageFromMap(fieldDescriptor.getMessageType(), map, ignoreUnknownValues); - } else { - throw new RuntimeException("Unexpected value " + jsonBQValue + " Expected a JSON map."); - } - } - @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor, jsonBQValue); - if (scalarValue == null) { - return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated, ignoreUnknownValues); - } else { - return scalarValue; - } + return singularFieldToProtoValue( + schemaInformation, fieldDescriptor, bqValue, ignoreUnknownValues); } @VisibleForTesting @Nullable - static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) { - if (jsonBQValue instanceof String) { - Function<String, Object> mapper = JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType()); - if (mapper == null) { - throw new UnsupportedOperationException( - "Converting BigQuery type '" - + jsonBQValue.getClass() - + "' to '" - + fieldDescriptor - + "' is not supported"); - } - return mapper.apply((String) jsonBQValue); - } - - switch (fieldDescriptor.getType()) { - case BOOL: - if (jsonBQValue instanceof Boolean) { - return jsonBQValue; + static Object singularFieldToProtoValue( + SchemaInformation schemaInformation, + FieldDescriptor fieldDescriptor, + Object value, + boolean ignoreUnknownValues) + throws SchemaConversionException { + switch (schemaInformation.getType()) { + case "INT64": + case "INTEGER": + if (value instanceof String) { + return Long.valueOf((String) value); + } else if (value instanceof Integer || value instanceof Long) { + return ((Number) value).longValue(); } break; - case BYTES: + case "FLOAT64": + case "FLOAT": + if (value instanceof String) { + return Double.valueOf((String) value); + } else if (value instanceof Double || value instanceof Float) { + return ((Number) value).doubleValue(); + } break; - case INT64: - if (jsonBQValue instanceof Integer) { - return Long.valueOf((Integer) jsonBQValue); - } else if (jsonBQValue instanceof Long) { - return jsonBQValue; + case "BOOLEAN": + case "BOOL": + if (value instanceof String) { + return Boolean.valueOf((String) value); + } else if (value instanceof Boolean) { + return value; } break; - case INT32: - if (jsonBQValue instanceof Integer) { - return jsonBQValue; + case "BYTES": + if (value instanceof String) { + return ByteString.copyFrom(BaseEncoding.base64().decode((String) value)); + } else if (value instanceof byte[]) { + return ByteString.copyFrom((byte[]) value); + } else if (value instanceof ByteString) { + return value; } break; - case STRING: + case "TIMESTAMP": + if (value instanceof String) { + try { + return ChronoUnit.MICROS.between(Instant.EPOCH, Instant.parse((String) value)); + } catch (DateTimeParseException e) { + return ChronoUnit.MICROS.between( + Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) value))); + } + } else if (value instanceof Instant) { + return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value); + } else if (value instanceof org.joda.time.Instant) { + // joda instant precision is millisecond + return ((org.joda.time.Instant) value).getMillis() * 1000L; + } else if (value instanceof Integer || value instanceof Long) { + return ((Number) value).longValue(); + } else if (value instanceof Double || value instanceof Float) { + // assume value represents number of seconds since epoch + return BigDecimal.valueOf(((Number) value).doubleValue()) + .scaleByPowerOfTen(6) + .setScale(0, RoundingMode.HALF_UP) + .longValue(); + } break; - case DOUBLE: - if (jsonBQValue instanceof Double) { - return jsonBQValue; - } else if (jsonBQValue instanceof Float) { - return Double.valueOf((Float) jsonBQValue); + case "DATE": + if (value instanceof String) { + return ((Long) LocalDate.parse((String) value).toEpochDay()).intValue(); + } else if (value instanceof LocalDate) { + return ((Long) ((LocalDate) value).toEpochDay()).intValue(); + } else if (value instanceof org.joda.time.LocalDate) { + return Days.daysBetween( + org.joda.time.Instant.EPOCH.toDateTime().toLocalDate(), + (org.joda.time.LocalDate) value) + .getDays(); + } else if (value instanceof Integer || value instanceof Long) { + return ((Number) value).intValue(); + } + break; + case "NUMERIC": + if (value instanceof String) { + return BigDecimalByteStringEncoder.encodeToNumericByteString( + new BigDecimal((String) value)); + } else if (value instanceof BigDecimal) { + return BigDecimalByteStringEncoder.encodeToNumericByteString(((BigDecimal) value)); + } else if (value instanceof Double || value instanceof Float) { + return BigDecimalByteStringEncoder.encodeToNumericByteString( + BigDecimal.valueOf(((Number) value).doubleValue())); + } + break; + case "BIGNUMERIC": + if (value instanceof String) { + return BigDecimalByteStringEncoder.encodeToBigNumericByteString( + new BigDecimal((String) value)); + } else if (value instanceof BigDecimal) { + return BigDecimalByteStringEncoder.encodeToBigNumericByteString(((BigDecimal) value)); + } else if (value instanceof Double || value instanceof Float) { + return BigDecimalByteStringEncoder.encodeToBigNumericByteString( + BigDecimal.valueOf(((Number) value).doubleValue())); + } + break; + case "DATETIME": + if (value instanceof String) { + return CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) value)); + } else if (value instanceof Number) { + return ((Number) value).longValue(); + } else if (value instanceof LocalDateTime) { + return CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value); + } else if (value instanceof org.joda.time.LocalDateTime) { + return CivilTimeEncoder.encodePacked64DatetimeMicros((org.joda.time.LocalDateTime) value); + } + break; + case "TIME": + if (value instanceof String) { + return CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) value)); + } else if (value instanceof Number) { + return ((Number) value).longValue(); + } else if (value instanceof LocalTime) { + return CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value); + } else if (value instanceof org.joda.time.LocalTime) { + return CivilTimeEncoder.encodePacked64TimeMicros((org.joda.time.LocalTime) value); + } + break; + case "STRING": + case "JSON": + case "GEOGRAPHY": + return value.toString(); + case "STRUCT": + case "RECORD": + if (value instanceof TableRow) { + TableRow tableRow = (TableRow) value; + return messageFromTableRow( + schemaInformation, fieldDescriptor.getMessageType(), tableRow, ignoreUnknownValues); + } else if (value instanceof AbstractMap) { + // This will handle nested rows. + AbstractMap<String, Object> map = ((AbstractMap<String, Object>) value); + return messageFromMap( + schemaInformation, fieldDescriptor.getMessageType(), map, ignoreUnknownValues); } break; - default: - throw new RuntimeException("Unsupported proto type " + fieldDescriptor.getType()); } - return null; + + throw new RuntimeException( + "Unexpected value :" + + value + + ", type: " + + value.getClass() + + ". Table field name: " + + schemaInformation.getName() + + ", type: " + + schemaInformation.getType()); } @VisibleForTesting diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 35dc57218f1..50f1bde09b9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -523,14 +523,6 @@ public class BigQueryIOWriteTest implements Serializable { testTimePartitioning(method); } - @Test - public void testTimePartitioningStorageApi() throws Exception { - if (!useStorageApi) { - return; - } - testTimePartitioning(Method.STORAGE_WRITE_API); - } - @Test public void testClusteringStorageApi() throws Exception { if (useStorageApi) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java new file mode 100644 index 00000000000..34962a90a57 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +/** Unit tests for {@link TableRowToStorageApiProto}. */ +public class TableRowToStorageApiProtoIT { + + private static final Logger LOG = LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class); + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("TableRowToStorageApiProtoIT"); + private static final String PROJECT = + TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String BIG_QUERY_DATASET_ID = + "table_row_to_storage_api_proto_" + System.nanoTime(); + + private static final TableSchema BASE_TABLE_SCHEMA = + new TableSchema() + .setFields( + ImmutableList.<TableFieldSchema>builder() + .add(new TableFieldSchema().setType("STRING").setName("stringValue")) + .add(new TableFieldSchema().setType("BYTES").setName("bytesValue")) + .add(new TableFieldSchema().setType("INT64").setName("int64Value")) + .add(new TableFieldSchema().setType("INTEGER").setName("intValue")) + .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value")) + .add(new TableFieldSchema().setType("FLOAT").setName("floatValue")) + .add(new TableFieldSchema().setType("BOOL").setName("boolValue")) + .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue")) + .add(new TableFieldSchema().setType("TIME").setName("timeValue")) + .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue")) + .add(new TableFieldSchema().setType("DATE").setName("dateValue")) + .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue")) + .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue")) + .add( + new TableFieldSchema() + .setType("BYTES") + .setMode("REPEATED") + .setName("arrayValue")) + .build()); + + private static final List<Object> REPEATED_BYTES = + ImmutableList.of( + BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)), + "goodbye".getBytes(StandardCharsets.UTF_8), + "solong".getBytes(StandardCharsets.UTF_8)); + + private static final TableRow BASE_TABLE_ROW = + new TableRow() + .set("stringValue", "string") + .set( + "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8))) + .set("int64Value", "42") + .set("intValue", "43") + .set("float64Value", "2.8168") + .set("floatValue", "2.817") + .set("boolValue", "true") + .set("booleanValue", "true") + .set("timestampValue", "1970-01-01T00:00:00.000043Z") + .set("timeValue", "00:52:07.123456") + .set("datetimeValue", "2019-08-16T00:52:07.123456") + .set("dateValue", "2019-08-16") + .set("numericValue", "23.4") + .set("bigNumericValue", "23334.4") + .set("arrayValue", REPEATED_BYTES); + + private static final TableRow BASE_TABLE_ROW_JODA_TIME = + new TableRow() + .set("stringValue", "string") + .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8)) + .set("int64Value", 42) + .set("intValue", 43) + .set("float64Value", 2.8168f) + .set("floatValue", 2.817f) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", org.joda.time.Instant.parse("1970-01-01T00:00:00.0043Z")) + .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456")) + .set("datetimeValue", org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456")) + .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16")) + .set("numericValue", new BigDecimal("23.4")) + .set("bigNumericValue", "23334.4") + .set("arrayValue", REPEATED_BYTES); + + private static final TableRow BASE_TABLE_ROW_JAVA_TIME = + new TableRow() + .set("stringValue", "string") + .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8)) + .set("int64Value", 42) + .set("intValue", 43) + .set("float64Value", 2.8168f) + .set("floatValue", 2.817f) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z")) + .set("timeValue", LocalTime.parse("00:52:07.123456")) + .set("datetimeValue", LocalDateTime.parse("2019-08-16T00:52:07.123456")) + .set("dateValue", LocalDate.parse("2019-08-16")) + .set("numericValue", new BigDecimal("23.4")) + .set("bigNumericValue", "23334.4") + .set("arrayValue", REPEATED_BYTES); + + private static final TableRow BASE_TABLE_ROW_NUM_TIME = + new TableRow() + .set("stringValue", "string") + .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8)) + .set("int64Value", 42) + .set("intValue", 43) + .set("float64Value", 2.8168f) + .set("floatValue", 2.817f) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", 43) + .set("timeValue", 3497124416L) + .set("datetimeValue", 142111881387172416L) + .set("dateValue", 18124) + .set("numericValue", new BigDecimal("23.4")) + .set("bigNumericValue", "23334.4") + .set("arrayValue", REPEATED_BYTES); + + private static final TableRow BASE_TABLE_ROW_FLOATS = + new TableRow() + .set("stringValue", "string") + .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8)) + .set("int64Value", 42) + .set("intValue", 43) + .set("float64Value", 2.8168f) + .set("floatValue", 2.817f) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", 43) + .set("timeValue", 3497124416L) + .set("datetimeValue", 142111881387172416D) + .set("dateValue", 18124) + .set("numericValue", 23.4) + .set("bigNumericValue", "23334.4") + .set("arrayValue", REPEATED_BYTES); + + private static final TableRow BASE_TABLE_ROW_NULL = + new TableRow() + // .set("stringValue", null) // do not set stringValue, this should work + .set("bytesValue", null) + .set("int64Value", null) + .set("intValue", null) + .set("float64Value", null) + .set("floatValue", null) + .set("boolValue", null) + .set("booleanValue", null) + .set("timestampValue", null) + .set("timeValue", null) + .set("datetimeValue", null) + .set("dateValue", null) + .set("numericValue", null) + .set("arrayValue", null); + + private static final List<Object> REPEATED_BYTES_EXPECTED = + ImmutableList.of( + BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)), + BaseEncoding.base64().encode("goodbye".getBytes(StandardCharsets.UTF_8)), + BaseEncoding.base64().encode("solong".getBytes(StandardCharsets.UTF_8))); + + private static final TableRow BASE_TABLE_ROW_EXPECTED = + new TableRow() + .set("stringValue", "string") + .set( + "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8))) + .set("int64Value", "42") + .set("intValue", "43") + .set("float64Value", 2.8168) + .set("floatValue", 2.817) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", "4.3E-5") + .set("timeValue", "00:52:07.123456") + .set("datetimeValue", "2019-08-16T00:52:07.123456") + .set("dateValue", "2019-08-16") + .set("numericValue", "23.4") + .set("bigNumericValue", "23334.4") + .set("arrayValue", REPEATED_BYTES_EXPECTED); + + // joda is up to millisecond precision, expect truncation + private static final TableRow BASE_TABLE_ROW_JODA_EXPECTED = + new TableRow() + .set("stringValue", "string") + .set( + "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8))) + .set("int64Value", "42") + .set("intValue", "43") + .set("float64Value", 2.8168) + .set("floatValue", 2.817) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", "0.004") + .set("timeValue", "00:52:07.123000") + .set("datetimeValue", "2019-08-16T00:52:07.123000") + .set("dateValue", "2019-08-16") + .set("numericValue", "23.4") + .set("bigNumericValue", "23334.4") + .set("arrayValue", REPEATED_BYTES_EXPECTED); + + private static final TableRow BASE_TABLE_ROW_NUM_EXPECTED = + new TableRow() + .set("stringValue", "string") + .set( + "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8))) + .set("int64Value", "42") + .set("intValue", "43") + .set("float64Value", 2.8168) + .set("floatValue", 2.817) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", "4.3E-5") + .set("timeValue", "00:52:07.123456") + .set("datetimeValue", "2019-08-16T00:52:07.123456") + .set("dateValue", "2019-08-16") + .set("numericValue", "23.4") + .set("bigNumericValue", "23334.4") + .set("arrayValue", REPEATED_BYTES_EXPECTED); + + private static final TableRow BASE_TABLE_ROW_FLOATS_EXPECTED = + new TableRow() + .set("stringValue", "string") + .set( + "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8))) + .set("int64Value", "42") + .set("intValue", "43") + .set("float64Value", 2.8168) + .set("floatValue", 2.817) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", "4.3E-5") + .set("timeValue", "00:52:07.123456") + .set("datetimeValue", "2019-08-16T00:52:07.123456") + .set("dateValue", "2019-08-16") + .set("numericValue", "23.4") + .set("bigNumericValue", "23334.4") + .set("arrayValue", REPEATED_BYTES_EXPECTED); + + // only nonnull values are returned, null in arrayValue should be converted to empty list + private static final TableRow BASE_TABLE_ROW_NULL_EXPECTED = + new TableRow().set("arrayValue", ImmutableList.of()); + + private static final TableSchema NESTED_TABLE_SCHEMA = + new TableSchema() + .setFields( + ImmutableList.<TableFieldSchema>builder() + .add( + new TableFieldSchema() + .setType("STRUCT") + .setName("nestedValue1") + .setMode("REQUIRED") + .setFields(BASE_TABLE_SCHEMA.getFields())) + .add( + new TableFieldSchema() + .setType("RECORD") + .setName("nestedValue2") + .setMode("REPEATED") + .setFields(BASE_TABLE_SCHEMA.getFields())) + .add( + new TableFieldSchema() + .setType("RECORD") + .setName("nestedValue3") + .setMode("NULLABLE") + .setFields(BASE_TABLE_SCHEMA.getFields())) + .build()); + + @BeforeClass + public static void setUpTestEnvironment() throws IOException, InterruptedException { + // Create one BQ dataset for all test cases. + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + @AfterClass + public static void cleanup() { + LOG.info("Start to clean up tables and datasets."); + BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + @Test + public void testBaseTableRow() throws IOException, InterruptedException { + String tableSpec = createTable(BASE_TABLE_SCHEMA); + + runPipeline(tableSpec, Collections.singleton(BASE_TABLE_ROW)); + + List<TableRow> actualTableRows = + BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true); + + assertEquals(1, actualTableRows.size()); + assertEquals(BASE_TABLE_ROW_EXPECTED, actualTableRows.get(0)); + } + + @Test + public void testNestedRichTypesAndNull() throws IOException, InterruptedException { + String tableSpec = createTable(NESTED_TABLE_SCHEMA); + TableRow tableRow = + new TableRow() + .set("nestedValue1", BASE_TABLE_ROW) + .set( + "nestedValue2", + Arrays.asList( + BASE_TABLE_ROW_JAVA_TIME, + BASE_TABLE_ROW_JODA_TIME, + BASE_TABLE_ROW_NUM_TIME, + BASE_TABLE_ROW_FLOATS, + BASE_TABLE_ROW_NULL)) + .set("nestedValue3", null); + + runPipeline(tableSpec, Collections.singleton(tableRow)); + + List<TableRow> actualTableRows = + BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true); + + assertEquals(1, actualTableRows.size()); + assertEquals(BASE_TABLE_ROW_EXPECTED, actualTableRows.get(0).get("nestedValue1")); + LOG.info("ACTUAL " + actualTableRows.get(0).get("nestedValue2")); + assertEquals( + ImmutableList.of( + BASE_TABLE_ROW_EXPECTED, + BASE_TABLE_ROW_JODA_EXPECTED, + BASE_TABLE_ROW_NUM_EXPECTED, + BASE_TABLE_ROW_FLOATS_EXPECTED, + BASE_TABLE_ROW_NULL_EXPECTED), + actualTableRows.get(0).get("nestedValue2")); + assertNull(actualTableRows.get(0).get("nestedValue3")); + } + + private static String createTable(TableSchema tableSchema) + throws IOException, InterruptedException { + String table = "table" + System.nanoTime(); + BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, table); + BQ_CLIENT.createNewTable( + PROJECT, + BIG_QUERY_DATASET_ID, + new Table() + .setSchema(tableSchema) + .setTableReference( + new TableReference() + .setTableId(table) + .setDatasetId(BIG_QUERY_DATASET_ID) + .setProjectId(PROJECT))); + return PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + table; + } + + private static void runPipeline(String tableSpec, Iterable<TableRow> tableRows) { + Pipeline p = Pipeline.create(); + p.apply("Create test cases", Create.of(tableRows)) + .apply( + "Write using Storage Write API", + BigQueryIO.<TableRow>write() + .to(tableSpec) + .withFormatFunction(SerializableFunctions.identity()) + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)); + p.run().waitUntilFinish(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index 483cddc08c5..459fed3a168 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; @@ -32,7 +33,9 @@ import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; +import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -73,9 +76,10 @@ public class TableRowToStorageApiProtoTest { .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue")) .add(new TableFieldSchema().setType("DATE").setName("dateValue")) .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue")) + .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue")) .add( new TableFieldSchema() - .setType("STRING") + .setType("BYTES") .setMode("REPEATED") .setName("arrayValue")) .build()); @@ -97,9 +101,10 @@ public class TableRowToStorageApiProtoTest { .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue")) .add(new TableFieldSchema().setType("DATE").setName("dateValue")) .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue")) + .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue")) .add( new TableFieldSchema() - .setType("STRING") + .setType("BYTES") .setMode("REPEATED") .setName("arrayValue")) .build()); @@ -173,42 +178,49 @@ public class TableRowToStorageApiProtoTest { FieldDescriptorProto.newBuilder() .setName("timestampvalue") .setNumber(10) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timevalue") .setNumber(11) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datetimevalue") .setNumber(12) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datevalue") .setNumber(13) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_INT32) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("numericvalue") .setNumber(14) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("arrayvalue") + .setName("bignumericvalue") .setNumber(15) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_BYTES) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("arrayvalue") + .setNumber(16) + .setType(Type.TYPE_BYTES) .setLabel(Label.LABEL_REPEATED) .build()) .build(); @@ -275,42 +287,49 @@ public class TableRowToStorageApiProtoTest { FieldDescriptorProto.newBuilder() .setName("timestampvalue") .setNumber(9) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timevalue") .setNumber(10) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datetimevalue") .setNumber(11) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datevalue") .setNumber(2) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_INT32) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("numericvalue") .setNumber(13) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("arrayvalue") + .setName("bignumericvalue") .setNumber(14) - .setType(Type.TYPE_STRING) + .setType(Type.TYPE_BYTES) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("arrayvalue") + .setNumber(15) + .setType(Type.TYPE_BYTES) .setLabel(Label.LABEL_REPEATED) .build()) .build(); @@ -414,6 +433,18 @@ public class TableRowToStorageApiProtoTest { assertEquals(expectedBaseTypesNoF, nestedTypesNoF2); } + private static final List<Object> REPEATED_BYTES = + ImmutableList.of( + BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)), + "goodbye".getBytes(StandardCharsets.UTF_8), + ByteString.copyFrom("solong".getBytes(StandardCharsets.UTF_8))); + + private static final List<Object> EXPECTED_PROTO_REPEATED_BYTES = + ImmutableList.of( + ByteString.copyFrom("hello".getBytes(StandardCharsets.UTF_8)), + ByteString.copyFrom("goodbye".getBytes(StandardCharsets.UTF_8)), + ByteString.copyFrom("solong".getBytes(StandardCharsets.UTF_8))); + private static final TableRow BASE_TABLE_ROW = new TableRow() .setF( @@ -429,12 +460,13 @@ public class TableRowToStorageApiProtoTest { new TableCell().setV("2.817"), new TableCell().setV("true"), new TableCell().setV("true"), - new TableCell().setV("43"), - new TableCell().setV("00:52:07[.123]|[.123456] UTC"), - new TableCell().setV("2019-08-16 00:52:07[.123]|[.123456] UTC"), + new TableCell().setV("1970-01-01T00:00:00.000043Z"), + new TableCell().setV("00:52:07.123456"), + new TableCell().setV("2019-08-16T00:52:07.123456"), new TableCell().setV("2019-08-16"), new TableCell().setV("23.4"), - new TableCell().setV(ImmutableList.of("hello", "goodbye")))); + new TableCell().setV("2312345.4"), + new TableCell().setV(REPEATED_BYTES))); private static final TableRow BASE_TABLE_ROW_NO_F = new TableRow() @@ -447,12 +479,13 @@ public class TableRowToStorageApiProtoTest { .set("floatValue", "2.817") .set("boolValue", "true") .set("booleanValue", "true") - .set("timestampValue", "43") - .set("timeValue", "00:52:07[.123]|[.123456] UTC") - .set("datetimeValue", "2019-08-16 00:52:07[.123]|[.123456] UTC") + .set("timestampValue", "1970-01-01T00:00:00.000043Z") + .set("timeValue", "00:52:07.123456") + .set("datetimeValue", "2019-08-16T00:52:07.123456") .set("dateValue", "2019-08-16") .set("numericValue", "23.4") - .set("arrayValue", ImmutableList.of("hello", "goodbye")); + .set("bigNumericValue", "2312345.4") + .set("arrayValue", REPEATED_BYTES); private static final Map<String, Object> BASE_ROW_EXPECTED_PROTO_VALUES = ImmutableMap.<String, Object>builder() @@ -465,12 +498,17 @@ public class TableRowToStorageApiProtoTest { .put("floatvalue", (double) 2.817) .put("boolvalue", true) .put("booleanvalue", true) - .put("timestampvalue", "43") - .put("timevalue", "00:52:07[.123]|[.123456] UTC") - .put("datetimevalue", "2019-08-16 00:52:07[.123]|[.123456] UTC") - .put("datevalue", "2019-08-16") - .put("numericvalue", "23.4") - .put("arrayvalue", ImmutableList.of("hello", "goodbye")) + .put("timestampvalue", 43L) + .put("timevalue", 3497124416L) + .put("datetimevalue", 142111881387172416L) + .put("datevalue", (int) LocalDate.of(2019, 8, 16).toEpochDay()) + .put( + "numericvalue", + BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("23.4"))) + .put( + "bignumericvalue", + BigDecimalByteStringEncoder.encodeToBigNumericByteString(new BigDecimal("2312345.4"))) + .put("arrayvalue", EXPECTED_PROTO_REPEATED_BYTES) .build(); private static final Map<String, Object> BASE_ROW_NO_F_EXPECTED_PROTO_VALUES = @@ -483,12 +521,17 @@ public class TableRowToStorageApiProtoTest { .put("floatvalue", (double) 2.817) .put("boolvalue", true) .put("booleanvalue", true) - .put("timestampvalue", "43") - .put("timevalue", "00:52:07[.123]|[.123456] UTC") - .put("datetimevalue", "2019-08-16 00:52:07[.123]|[.123456] UTC") - .put("datevalue", "2019-08-16") - .put("numericvalue", "23.4") - .put("arrayvalue", ImmutableList.of("hello", "goodbye")) + .put("timestampvalue", 43L) + .put("timevalue", 3497124416L) + .put("datetimevalue", 142111881387172416L) + .put("datevalue", (int) LocalDate.parse("2019-08-16").toEpochDay()) + .put( + "numericvalue", + BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("23.4"))) + .put( + "bignumericvalue", + BigDecimalByteStringEncoder.encodeToBigNumericByteString(new BigDecimal("2312345.4"))) + .put("arrayvalue", EXPECTED_PROTO_REPEATED_BYTES) .build(); private void assertBaseRecord(DynamicMessage msg, boolean withF) { @@ -506,12 +549,16 @@ public class TableRowToStorageApiProtoTest { new TableRow() .set("nestedValue1", BASE_TABLE_ROW) .set("nestedValue2", BASE_TABLE_ROW) - .set("nestedvalueNoF1", BASE_TABLE_ROW_NO_F) - .set("nestedvalueNoF2", BASE_TABLE_ROW_NO_F); + .set("nestedValueNoF1", BASE_TABLE_ROW_NO_F) + .set("nestedValueNoF2", BASE_TABLE_ROW_NO_F); Descriptor descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA); - DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow(descriptor, tableRow, false); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA); + DynamicMessage msg = + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, tableRow, false); assertEquals(4, msg.getAllFields().size()); Map<String, FieldDescriptor> fieldDescriptors = @@ -527,8 +574,11 @@ public class TableRowToStorageApiProtoTest { public void testMessageWithFFromTableRow() throws Exception { Descriptor descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA); DynamicMessage msg = - TableRowToStorageApiProto.messageFromTableRow(descriptor, BASE_TABLE_ROW, false); + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, BASE_TABLE_ROW, false); assertBaseRecord(msg, true); } @@ -567,8 +617,11 @@ public class TableRowToStorageApiProtoTest { .set("repeatednof2", ImmutableList.of(BASE_TABLE_ROW_NO_F, BASE_TABLE_ROW_NO_F)); Descriptor descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA); DynamicMessage msg = - TableRowToStorageApiProto.messageFromTableRow(descriptor, repeatedRow, false); + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, repeatedRow, false); assertEquals(4, msg.getAllFields().size()); Map<String, FieldDescriptor> fieldDescriptors = @@ -609,8 +662,11 @@ public class TableRowToStorageApiProtoTest { .set("repeatednof2", null); Descriptor descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA); DynamicMessage msg = - TableRowToStorageApiProto.messageFromTableRow(descriptor, repeatedRow, false); + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, repeatedRow, false); Map<String, FieldDescriptor> fieldDescriptors = descriptor.getFields().stream() diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java index 577a470cdcd..d58ad0b6b53 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java @@ -69,7 +69,8 @@ public class StructUtilsTest { Struct struct = Struct.newBuilder().set("f_int64").to("string_value").build(); Exception exception = assertThrows(ClassCastException.class, () -> StructUtils.structToBeamRow(struct, schema)); - checkMessage("java.lang.String cannot be cast to java.lang.Long", exception.getMessage()); + checkMessage("java.lang.String cannot be cast to", exception.getMessage()); + checkMessage("java.lang.Long", exception.getMessage()); } @Test