Thank you Jeff, I think 1 is a bug and I am planning to report it in the bugtracker regarding 2 I have now a fixed TableSchema supplied to this pipeline with the expected fields, and I am ignoring unknown fields.
Best, Tobi On Wed, Feb 13, 2019 at 5:03 PM Jeff Klukas <[email protected]> wrote: > To question 1, I also would have expected the pipeline to fail in the case > of files failing to load; I'm not sure why it's not. I thought the BigQuery > API returns a 400 level response code in the case of files failing and that > would bubble up to a pipeline execution error, but I haven't dug through > the code to verify that. > > As to question 2, the lack of a native map type in BigQuery means you're > in a bit of a tough spot. BigQuery loads from Avro handle this by modeling > the map type as an array of (key, value) structs [0]. You could modify your > payload to match that same sort of structure, which would transparently > handle future additions of new keys, but may not be as convenient for > querying. > > Otherwise, if you want to model the "log" structure as a struct in > BigQuery, I think you'd need to provide the schemas as a side input to > BigQueryIO. Perhaps you could have a branch of your pipeline look for new > keys in the payload and output a view of the schema. > > Another option would be to set ignoreUnknownValues() which would drop > values for any fields not existing in the BQ table. Dropping those values > may or may not be acceptable for your use case. > > [0] > https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types > > On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias <[email protected]> > wrote: > >> Hello, >> >> I have a BigQuery table which ingests a Protobuf stream from Kafka with a >> Beam pipeline. The Protobuf has a `log Map<String,String>` column which >> translates to a field "log" of type RECORD with unknown fields in BigQuery. >> >> So I scanned my whole stream to know which schema fields to expect and >> created an empty daily-partitioned table in BigQuery with correct fields >> for this RECORD type. >> >> Now someone is pushing a new field into this RECORD and the import fails >> as the schema is not matching anymore. My pipeline is configured to use >> FILE_LOADS and so these do fail - but the Flink pipeline just continues and >> does not throw any error. >> >> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions >> are: >> >> 1. How can I make the pipeline fail hard in this situation? >> 2. How can I prevent this from happening? If I create the table with the >> correct schema in the beginning the table schema seems to be overwritten >> and autodetected again (without the added field). This is causing the load >> to fail. If I alter the schema and add the field manually when the pipeline >> is running it fixes it, but then I already have a dozen of failed imports. >> The main issue seems to be that I have a Protobuf schema which only >> defines a Map<String,String> type, that is not specific enough to create >> the full matching schema from it. >> >> >>
