To question 1, I also would have expected the pipeline to fail in the case
of files failing to load; I'm not sure why it's not. I thought the BigQuery
API returns a 400 level response code in the case of files failing and that
would bubble up to a pipeline execution error, but I haven't dug through
the code to verify that.

As to question 2, the lack of a native map type in BigQuery means you're in
a bit of a tough spot. BigQuery loads from Avro handle this by modeling the
map type as an array of (key, value) structs [0]. You could modify your
payload to match that same sort of structure, which would transparently
handle future additions of new keys, but may not be as convenient for
querying.

Otherwise, if you want to model the "log" structure as a struct in
BigQuery, I think you'd need to provide the schemas as a side input to
BigQueryIO. Perhaps you could have a branch of your pipeline look for new
keys in the payload and output a view of the schema.

Another option would be to set ignoreUnknownValues() which would drop
values for any fields not existing in the BQ table. Dropping those values
may or may not be acceptable for your use case.

[0]
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types

On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias <[email protected]>
wrote:

> Hello,
>
> I have a BigQuery table which ingests a Protobuf stream from Kafka with a
> Beam pipeline. The Protobuf has a `log Map<String,String>` column which
> translates to a field "log" of type RECORD with unknown fields in BigQuery.
>
> So I scanned my whole stream to know which schema fields to expect and
> created an empty daily-partitioned table in BigQuery with correct fields
> for this RECORD type.
>
> Now someone is pushing a new field into this RECORD and the import fails
> as the schema is not matching anymore. My pipeline is configured to use
> FILE_LOADS and so these do fail - but the Flink pipeline just continues and
> does not throw any error.
>
> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions
> are:
>
> 1. How can I make the pipeline fail hard in this situation?
> 2. How can I prevent this from happening? If I create the table with the
> correct schema in the beginning the table schema seems to be overwritten
> and autodetected again (without the added field). This is causing the load
> to fail. If I alter the schema and add the field manually when the pipeline
> is running it fixes it, but then I already have a dozen of failed imports.
> The main issue seems to be that I have a Protobuf schema which only
> defines a Map<String,String> type, that is not specific enough to create
> the full matching schema from it.
>
>
>

Reply via email to