[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=198439=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-198439 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 14/Feb/19 03:04 Start Date: 14/Feb/19 03:04 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 198439) Time Spent: 4h 10m (was: 4h) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Labels: triaged > Time Spent: 4h 10m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=198344=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-198344 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 13/Feb/19 21:55 Start Date: 13/Feb/19 21:55 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-463390681 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 198344) Time Spent: 4h (was: 3h 50m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Labels: triaged > Time Spent: 4h > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=198267=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-198267 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 13/Feb/19 18:38 Start Date: 13/Feb/19 18:38 Worklog Time Spent: 10m Work Description: kmjung commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-463315433 @chamikaramj we should be good to go here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 198267) Time Spent: 3h 50m (was: 3h 40m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Labels: triaged > Time Spent: 3h 50m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=197857=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-197857 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 12/Feb/19 23:34 Start Date: 12/Feb/19 23:34 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-462986360 LGTM. Will merge after tests pass. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 197857) Time Spent: 3h 40m (was: 3.5h) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Labels: triaged > Time Spent: 3h 40m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=197821=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-197821 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 12/Feb/19 22:01 Start Date: 12/Feb/19 22:01 Worklog Time Spent: 10m Work Description: kmjung commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-462953979 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 197821) Time Spent: 3.5h (was: 3h 20m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Labels: triaged > Time Spent: 3.5h > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=197287=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-197287 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 12/Feb/19 00:20 Start Date: 12/Feb/19 00:20 Worklog Time Spent: 10m Work Description: kmjung commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-462551469 @chamikaramj the change to update the GCP connector versions (#7783) has been merged, and I've resolved the conflict here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 197287) Time Spent: 3h 20m (was: 3h 10m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Labels: triaged > Time Spent: 3h 20m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=195461=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195461 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 07/Feb/19 01:15 Start Date: 07/Feb/19 01:15 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-461253273 Please try to upgrade $generated_grpc_beta_version to 0.39.0 and using it for google_cloud_bigquery_storage_proto instead of using a separate version there. Also, seems like there's a conflict now. LGTM for the rest. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 195461) Time Spent: 3h 10m (was: 3h) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=195460=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195460 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 07/Feb/19 01:14 Start Date: 07/Feb/19 01:14 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-461253273 Please try to upgrade $generated_grpc_beta_version to 0.39.0 and using it here instead of using a separate version here. Also, seems like there's a conflict now. LGTM for the rest. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 195460) Time Spent: 3h (was: 2h 50m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=195445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195445 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 07/Feb/19 00:18 Start Date: 07/Feb/19 00:18 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254505832 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -177,27 +250,47 @@ private static Object convertRequiredField( // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field // is required, so it may not be null. String bqType = fieldSchema.getType(); -Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType); -verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType); +ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); +verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); verify( -avroType == expectedAvroType, -"Expected Avro schema type %s, not %s, for BigQuery %s field %s", -expectedAvroType, -avroType, +expectedAvroTypes.contains(avroType), +"Expected Avro schema types %s for BigQuery %s field %s, but received %s", +expectedAvroTypes, bqType, -fieldSchema.getName()); +fieldSchema.getName(), +avroType); // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. -switch (fieldSchema.getType()) { +switch (bqType) { case "STRING": - case "DATE": case "DATETIME": - case "TIME": case "GEOGRAPHY": // Avro will use a CharSequence to represent String objects, but it may not always use // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); return v.toString(); + case "DATE": +if (avroType == Type.INT) { + verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); Review comment: For the record (based on offline chat), this change just generalizes the current behavior and does not break backwards compatibility. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 195445) Time Spent: 2h 50m (was: 2h 40m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=195413=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195413 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 06/Feb/19 22:54 Start Date: 06/Feb/19 22:54 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254485914 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -177,27 +250,47 @@ private static Object convertRequiredField( // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field // is required, so it may not be null. String bqType = fieldSchema.getType(); -Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType); -verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType); +ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); +verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); verify( -avroType == expectedAvroType, -"Expected Avro schema type %s, not %s, for BigQuery %s field %s", -expectedAvroType, -avroType, +expectedAvroTypes.contains(avroType), +"Expected Avro schema types %s for BigQuery %s field %s, but received %s", +expectedAvroTypes, bqType, -fieldSchema.getName()); +fieldSchema.getName(), +avroType); // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. -switch (fieldSchema.getType()) { +switch (bqType) { case "STRING": - case "DATE": case "DATETIME": - case "TIME": case "GEOGRAPHY": // Avro will use a CharSequence to represent String objects, but it may not always use // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); return v.toString(); + case "DATE": +if (avroType == Type.INT) { + verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); Review comment: So is this change (and below) backwards incompatible changes for export based read path ? That'll break our existing users. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 195413) Time Spent: 2h 40m (was: 2.5h) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194855=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194855 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 22:53 Start Date: 05/Feb/19 22:53 Worklog Time Spent: 10m Work Description: kmjung commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254079874 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession; +import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream; +import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; + +/** A {@link org.apache.beam.sdk.io.Source} representing a single stream in a read session. */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class BigQueryStorageStreamSource extends OffsetBasedSource { + + public static BigQueryStorageStreamSource create( + ReadSession readSession, + Stream stream, + TableSchema tableSchema, + SerializableFunction parseFn, + Coder outputCoder, + BigQueryServices bqServices) { +return new BigQueryStorageStreamSource<>( +readSession, +stream, +0L, +Long.MAX_VALUE, +1L, +toJsonString(checkNotNull(tableSchema, "tableSchema")), +parseFn, +outputCoder, +bqServices); + } + + private final ReadSession readSession; + private final Stream stream; + private final String jsonTableSchema; + private final SerializableFunction parseFn; + private final Coder outputCoder; + private final BigQueryServices bqServices; + + private BigQueryStorageStreamSource( + ReadSession readSession, + Stream stream, + long startOffset, + long stopOffset, + long minBundleSize, + String jsonTableSchema, + SerializableFunction parseFn, + Coder outputCoder, + BigQueryServices bqServices) { +super(startOffset, stopOffset, minBundleSize); +this.readSession = checkNotNull(readSession, "readSession"); +this.stream = checkNotNull(stream, "stream"); +this.jsonTableSchema = checkNotNull(jsonTableSchema, "jsonTableSchema"); +this.parseFn = checkNotNull(parseFn, "parseFn"); +this.outputCoder = checkNotNull(outputCoder, "outputCoder"); +this.bqServices = checkNotNull(bqServices, "bqServices"); + } + + @Override + public Coder getOutputCoder() { +return outputCoder; + } + +
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194835=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194835 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 22:08 Start Date: 05/Feb/19 22:08 Worklog Time Spent: 10m Work Description: kmjung commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254066703 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -177,27 +250,47 @@ private static Object convertRequiredField( // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field // is required, so it may not be null. String bqType = fieldSchema.getType(); -Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType); -verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType); +ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); +verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); verify( -avroType == expectedAvroType, -"Expected Avro schema type %s, not %s, for BigQuery %s field %s", -expectedAvroType, -avroType, +expectedAvroTypes.contains(avroType), +"Expected Avro schema types %s for BigQuery %s field %s, but received %s", +expectedAvroTypes, bqType, -fieldSchema.getName()); +fieldSchema.getName(), +avroType); // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. -switch (fieldSchema.getType()) { +switch (bqType) { case "STRING": - case "DATE": case "DATETIME": - case "TIME": case "GEOGRAPHY": // Avro will use a CharSequence to represent String objects, but it may not always use // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); return v.toString(); + case "DATE": +if (avroType == Type.INT) { + verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); Review comment: No -- this code path (an Avro logical 'date' type for a BigQuery DATE record) can be reached in both the export and read API cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 194835) Time Spent: 2h (was: 1h 50m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194836=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194836 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 22:09 Start Date: 05/Feb/19 22:09 Worklog Time Spent: 10m Work Description: kmjung commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254066814 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -177,27 +250,47 @@ private static Object convertRequiredField( // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field // is required, so it may not be null. String bqType = fieldSchema.getType(); -Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType); -verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType); +ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); +verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); verify( -avroType == expectedAvroType, -"Expected Avro schema type %s, not %s, for BigQuery %s field %s", -expectedAvroType, -avroType, +expectedAvroTypes.contains(avroType), +"Expected Avro schema types %s for BigQuery %s field %s, but received %s", +expectedAvroTypes, bqType, -fieldSchema.getName()); +fieldSchema.getName(), +avroType); // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. -switch (fieldSchema.getType()) { +switch (bqType) { case "STRING": - case "DATE": case "DATETIME": - case "TIME": case "GEOGRAPHY": // Avro will use a CharSequence to represent String objects, but it may not always use // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); return v.toString(); + case "DATE": +if (avroType == Type.INT) { + verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); + verifyNotNull(avroLogicalType, "Expected Date logical type"); + verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type"); + return formatDate((Integer) v); +} else { + verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); + return v.toString(); +} + case "TIME": +if (avroType == Type.LONG) { + verify(v instanceof Long, "Expected Long, got %s", v.getClass()); + verifyNotNull(avroLogicalType, "Expected TimeMicros logical type"); Review comment: No -- this code path (an Avro logical 'time-micros' type for a BigQuery TIME record) can be reached in both the export and read API cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 194836) Time Spent: 2h 10m (was: 2h) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194834=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194834 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 22:08 Start Date: 05/Feb/19 22:08 Worklog Time Spent: 10m Work Description: kmjung commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254066465 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -177,27 +250,47 @@ private static Object convertRequiredField( // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field // is required, so it may not be null. String bqType = fieldSchema.getType(); -Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType); -verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType); +ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); +verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); verify( -avroType == expectedAvroType, -"Expected Avro schema type %s, not %s, for BigQuery %s field %s", -expectedAvroType, -avroType, +expectedAvroTypes.contains(avroType), +"Expected Avro schema types %s for BigQuery %s field %s, but received %s", +expectedAvroTypes, bqType, -fieldSchema.getName()); +fieldSchema.getName(), +avroType); // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. -switch (fieldSchema.getType()) { +switch (bqType) { case "STRING": - case "DATE": case "DATETIME": - case "TIME": case "GEOGRAPHY": // Avro will use a CharSequence to represent String objects, but it may not always use // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); return v.toString(); + case "DATE": Review comment: Depending on the caller's project context, export may or may not produce Avro records identical to the read API today. Going forward, we will standardize on the format used by the read API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 194834) Time Spent: 1h 50m (was: 1h 40m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194821=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194821 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 21:42 Start Date: 05/Feb/19 21:42 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254052066 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ## @@ -528,6 +531,21 @@ public Read withTemplateCompatibility() { /** Implementation of {@link BigQueryIO#read(SerializableFunction)}. */ @AutoValue public abstract static class TypedRead extends PTransform> { +/** Determines the method used to read data from BigQuery. */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public enum Method { + /** The default behavior if no method is explicitly set. Currently {@link #EXPORT}. */ + DEFAULT, + + /** + * Export data to Google Cloud Storage in Avro format and read data files from that location. + */ + EXPORT, + + /** Read the contents of a table directly using the BigQuery storage API. */ Review comment: storage API or read API ? Can you also add a link since this is pretty new. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 194821) Time Spent: 1h 20m (was: 1h 10m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194829=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194829 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 21:48 Start Date: 05/Feb/19 21:48 Worklog Time Spent: 10m Work Description: kmjung commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254060105 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -100,6 +116,62 @@ static String formatTimestamp(String timestamp) { return String.format("%s.%s UTC", dayAndTime, fractionalSeconds); } + /** + * This method formats a BigQuery DATE value into a String matching the format used by JSON + * export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic + * Gregorian calendar. + */ + private static String formatDate(int date) { +return LocalDate.ofEpochDay(date).format(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE); + } + + private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_MICROS = + new DateTimeFormatterBuilder() + .appendValue(HOUR_OF_DAY, 2) + .appendLiteral(':') + .appendValue(MINUTE_OF_HOUR, 2) + .appendLiteral(':') + .appendValue(SECOND_OF_MINUTE, 2) + .appendLiteral('.') + .appendFraction(NANO_OF_SECOND, 6, 6, false) + .toFormatter(); + + private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_MILLIS = + new DateTimeFormatterBuilder() + .appendValue(HOUR_OF_DAY, 2) + .appendLiteral(':') + .appendValue(MINUTE_OF_HOUR, 2) + .appendLiteral(':') + .appendValue(SECOND_OF_MINUTE, 2) + .appendLiteral('.') + .appendFraction(NANO_OF_SECOND, 3, 3, false) + .toFormatter(); + + private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_SECONDS = + new DateTimeFormatterBuilder() + .appendValue(HOUR_OF_DAY, 2) + .appendLiteral(':') + .appendValue(MINUTE_OF_HOUR, 2) + .appendLiteral(':') + .appendValue(SECOND_OF_MINUTE, 2) + .toFormatter(); + + /** + * This method formats a BigQuery TIME value into a String matching the format used by JSON + * export. Time records are stored in "microseconds since midnight" format. + */ + private static String formatTime(long timeMicros) { Review comment: This is necessary in order to match exactly the string format used by BigQuery export, which always uses the least-precise format if a value can be converted without loss of precision. It's likely that the approach you propose would not lead to correctness issues for pipelines; the integration test I've added wouldn't pass, though. :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 194829) Time Spent: 1h 40m (was: 1.5h) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194823=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194823 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 21:42 Start Date: 05/Feb/19 21:42 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254052761 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession; +import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream; +import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; + +/** A {@link org.apache.beam.sdk.io.Source} representing a single stream in a read session. */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class BigQueryStorageStreamSource extends OffsetBasedSource { + + public static BigQueryStorageStreamSource create( + ReadSession readSession, + Stream stream, + TableSchema tableSchema, + SerializableFunction parseFn, + Coder outputCoder, + BigQueryServices bqServices) { +return new BigQueryStorageStreamSource<>( +readSession, +stream, +0L, +Long.MAX_VALUE, +1L, +toJsonString(checkNotNull(tableSchema, "tableSchema")), +parseFn, +outputCoder, +bqServices); + } + + private final ReadSession readSession; + private final Stream stream; + private final String jsonTableSchema; + private final SerializableFunction parseFn; + private final Coder outputCoder; + private final BigQueryServices bqServices; + + private BigQueryStorageStreamSource( + ReadSession readSession, + Stream stream, + long startOffset, + long stopOffset, + long minBundleSize, + String jsonTableSchema, + SerializableFunction parseFn, + Coder outputCoder, + BigQueryServices bqServices) { +super(startOffset, stopOffset, minBundleSize); +this.readSession = checkNotNull(readSession, "readSession"); +this.stream = checkNotNull(stream, "stream"); +this.jsonTableSchema = checkNotNull(jsonTableSchema, "jsonTableSchema"); +this.parseFn = checkNotNull(parseFn, "parseFn"); +this.outputCoder = checkNotNull(outputCoder, "outputCoder"); +this.bqServices = checkNotNull(bqServices, "bqServices"); + } + + @Override + public Coder getOutputCoder() { +return outputCoder; + } + +
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194819=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194819 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 21:42 Start Date: 05/Feb/19 21:42 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r253175366 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -100,6 +116,62 @@ static String formatTimestamp(String timestamp) { return String.format("%s.%s UTC", dayAndTime, fractionalSeconds); } + /** + * This method formats a BigQuery DATE value into a String matching the format used by JSON + * export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic + * Gregorian calendar. + */ + private static String formatDate(int date) { +return LocalDate.ofEpochDay(date).format(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE); + } + + private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_MICROS = + new DateTimeFormatterBuilder() + .appendValue(HOUR_OF_DAY, 2) + .appendLiteral(':') + .appendValue(MINUTE_OF_HOUR, 2) + .appendLiteral(':') + .appendValue(SECOND_OF_MINUTE, 2) + .appendLiteral('.') + .appendFraction(NANO_OF_SECOND, 6, 6, false) + .toFormatter(); + + private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_MILLIS = + new DateTimeFormatterBuilder() + .appendValue(HOUR_OF_DAY, 2) + .appendLiteral(':') + .appendValue(MINUTE_OF_HOUR, 2) + .appendLiteral(':') + .appendValue(SECOND_OF_MINUTE, 2) + .appendLiteral('.') + .appendFraction(NANO_OF_SECOND, 3, 3, false) + .toFormatter(); + + private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_SECONDS = + new DateTimeFormatterBuilder() + .appendValue(HOUR_OF_DAY, 2) + .appendLiteral(':') + .appendValue(MINUTE_OF_HOUR, 2) + .appendLiteral(':') + .appendValue(SECOND_OF_MINUTE, 2) + .toFormatter(); + + /** + * This method formats a BigQuery TIME value into a String matching the format used by JSON + * export. Time records are stored in "microseconds since midnight" format. + */ + private static String formatTime(long timeMicros) { Review comment: Just to clarify, why don't we always use most precise version (ISO_LOCAL_TIME_FORMATTER_MICROS) here ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 194819) Time Spent: 1h 10m (was: 1h) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194820=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194820 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 21:42 Start Date: 05/Feb/19 21:42 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254051736 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -177,27 +250,47 @@ private static Object convertRequiredField( // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field // is required, so it may not be null. String bqType = fieldSchema.getType(); -Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType); -verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType); +ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); +verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); verify( -avroType == expectedAvroType, -"Expected Avro schema type %s, not %s, for BigQuery %s field %s", -expectedAvroType, -avroType, +expectedAvroTypes.contains(avroType), +"Expected Avro schema types %s for BigQuery %s field %s, but received %s", +expectedAvroTypes, bqType, -fieldSchema.getName()); +fieldSchema.getName(), +avroType); // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. -switch (fieldSchema.getType()) { +switch (bqType) { case "STRING": - case "DATE": case "DATETIME": - case "TIME": case "GEOGRAPHY": // Avro will use a CharSequence to represent String objects, but it may not always use // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); return v.toString(); + case "DATE": +if (avroType == Type.INT) { + verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); Review comment: Is this path only hit for Read API (if so please add a comment so that we don't loose the mapping). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 194820) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194822=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194822 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 21:42 Start Date: 05/Feb/19 21:42 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254051306 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -177,27 +250,47 @@ private static Object convertRequiredField( // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field // is required, so it may not be null. String bqType = fieldSchema.getType(); -Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType); -verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType); +ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); +verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); verify( -avroType == expectedAvroType, -"Expected Avro schema type %s, not %s, for BigQuery %s field %s", -expectedAvroType, -avroType, +expectedAvroTypes.contains(avroType), +"Expected Avro schema types %s for BigQuery %s field %s, but received %s", +expectedAvroTypes, bqType, -fieldSchema.getName()); +fieldSchema.getName(), +avroType); // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. -switch (fieldSchema.getType()) { +switch (bqType) { case "STRING": - case "DATE": case "DATETIME": - case "TIME": case "GEOGRAPHY": // Avro will use a CharSequence to represent String objects, but it may not always use // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); return v.toString(); + case "DATE": Review comment: Do we have any backwards incompatible data type conversions for people who migrate pipelines from export-based read transform to read API based read transform ? If so we should try to minimize that and any incompatibilities that are unavoidable should be clearly documented. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 194822) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=194824=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194824 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 05/Feb/19 21:42 Start Date: 05/Feb/19 21:42 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#discussion_r254051771 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ## @@ -177,27 +250,47 @@ private static Object convertRequiredField( // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field // is required, so it may not be null. String bqType = fieldSchema.getType(); -Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType); -verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType); +ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); +verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); verify( -avroType == expectedAvroType, -"Expected Avro schema type %s, not %s, for BigQuery %s field %s", -expectedAvroType, -avroType, +expectedAvroTypes.contains(avroType), +"Expected Avro schema types %s for BigQuery %s field %s, but received %s", +expectedAvroTypes, bqType, -fieldSchema.getName()); +fieldSchema.getName(), +avroType); // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. -switch (fieldSchema.getType()) { +switch (bqType) { case "STRING": - case "DATE": case "DATETIME": - case "TIME": case "GEOGRAPHY": // Avro will use a CharSequence to represent String objects, but it may not always use // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); return v.toString(); + case "DATE": +if (avroType == Type.INT) { + verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); + verifyNotNull(avroLogicalType, "Expected Date logical type"); + verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type"); + return formatDate((Integer) v); +} else { + verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); + return v.toString(); +} + case "TIME": +if (avroType == Type.LONG) { + verify(v instanceof Long, "Expected Long, got %s", v.getClass()); + verifyNotNull(avroLogicalType, "Expected TimeMicros logical type"); Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 194824) Time Spent: 1.5h (was: 1h 20m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=189276=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189276 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 24/Jan/19 03:07 Start Date: 24/Jan/19 03:07 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-457049970 cc: @reuvenlax @pedapudi This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 189276) Time Spent: 1h (was: 50m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=183884=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183884 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 10/Jan/19 19:36 Start Date: 10/Jan/19 19:36 Worklog Time Spent: 10m Work Description: kanterov commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-453176331 @kmjung this is awesome, I've been waiting for this feature. Is there any documentation for BigQuery API for direct reads? Couldn't find any. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 183884) Time Spent: 50m (was: 40m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=183816=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183816 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 10/Jan/19 17:14 Start Date: 10/Jan/19 17:14 Worklog Time Spent: 10m Work Description: kanterov commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-453176331 @kmjung this is awesome, I've been waiting for this feature. Is there any documentation for BigQuery API for direct reads? Couldn't find any. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 183816) Time Spent: 40m (was: 0.5h) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=183815=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183815 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 10/Jan/19 17:12 Start Date: 10/Jan/19 17:12 Worklog Time Spent: 10m Work Description: kanterov commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-453176331 @kmjung this is awesome, I've been waiting for this feature. Is there any documentation for BigQuery API for direct reads? Couldn't find one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 183815) Time Spent: 0.5h (was: 20m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Kenneth Jung >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=182742=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182742 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 08/Jan/19 23:15 Start Date: 08/Jan/19 23:15 Worklog Time Spent: 10m Work Description: kmjung commented on issue #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441#issuecomment-452486630 cc: @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 182742) Time Spent: 20m (was: 10m) > Add support for new BigQuery streaming read API to BigQueryIO > - > > Key: BEAM-6392 > URL: https://issues.apache.org/jira/browse/BEAM-6392 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Kenneth Jung >Assignee: Chamikara Jayalath >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > BigQuery has developed a new streaming egress API which will soon reach > public availability. Add support for the new API in BigQueryIO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO
[ https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=182741=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182741 ] ASF GitHub Bot logged work on BEAM-6392: Author: ASF GitHub Bot Created on: 08/Jan/19 23:15 Start Date: 08/Jan/19 23:15 Worklog Time Spent: 10m Work Description: kmjung commented on pull request #7441: [BEAM-6392] Add support for the BigQuery read API to BigQueryIO. URL: https://github.com/apache/beam/pull/7441 This change adds support for the new BigQuery high-throughput read API to BigQueryIO. The initial commit supports only reading from existing tables. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: