Actually the number of retries is 1000 by default, so it might have not reached that number yet. I will have to test that again. https://github.com/apache/beam/blob/38daf8c45b14a94665939b562ab947dc72ad8f8f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1901
On Thu, Feb 14, 2019 at 1:05 PM Kaymak, Tobias <[email protected]> wrote: > Thank you Jeff, > > I think 1 is a bug and I am planning to report it in the bugtracker > regarding 2 I have now a fixed TableSchema supplied to this pipeline with > the expected fields, and I am ignoring unknown fields. > > Best, > Tobi > > On Wed, Feb 13, 2019 at 5:03 PM Jeff Klukas <[email protected]> wrote: > >> To question 1, I also would have expected the pipeline to fail in the >> case of files failing to load; I'm not sure why it's not. I thought the >> BigQuery API returns a 400 level response code in the case of files failing >> and that would bubble up to a pipeline execution error, but I haven't dug >> through the code to verify that. >> >> As to question 2, the lack of a native map type in BigQuery means you're >> in a bit of a tough spot. BigQuery loads from Avro handle this by modeling >> the map type as an array of (key, value) structs [0]. You could modify your >> payload to match that same sort of structure, which would transparently >> handle future additions of new keys, but may not be as convenient for >> querying. >> >> Otherwise, if you want to model the "log" structure as a struct in >> BigQuery, I think you'd need to provide the schemas as a side input to >> BigQueryIO. Perhaps you could have a branch of your pipeline look for new >> keys in the payload and output a view of the schema. >> >> Another option would be to set ignoreUnknownValues() which would drop >> values for any fields not existing in the BQ table. Dropping those values >> may or may not be acceptable for your use case. >> >> [0] >> https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types >> >> On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias <[email protected]> >> wrote: >> >>> Hello, >>> >>> I have a BigQuery table which ingests a Protobuf stream from Kafka with >>> a Beam pipeline. The Protobuf has a `log Map<String,String>` column which >>> translates to a field "log" of type RECORD with unknown fields in BigQuery. >>> >>> So I scanned my whole stream to know which schema fields to expect and >>> created an empty daily-partitioned table in BigQuery with correct fields >>> for this RECORD type. >>> >>> Now someone is pushing a new field into this RECORD and the import fails >>> as the schema is not matching anymore. My pipeline is configured to use >>> FILE_LOADS and so these do fail - but the Flink pipeline just continues and >>> does not throw any error. >>> >>> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions >>> are: >>> >>> 1. How can I make the pipeline fail hard in this situation? >>> 2. How can I prevent this from happening? If I create the table with the >>> correct schema in the beginning the table schema seems to be overwritten >>> and autodetected again (without the added field). This is causing the load >>> to fail. If I alter the schema and add the field manually when the pipeline >>> is running it fixes it, but then I already have a dozen of failed imports. >>> The main issue seems to be that I have a Protobuf schema which only >>> defines a Map<String,String> type, that is not specific enough to create >>> the full matching schema from it. >>> >>> >>>
