Thank you Jeff,

I think 1 is a bug and I am planning to report it in the bugtracker
regarding 2 I have now a fixed TableSchema supplied to this pipeline with
the expected fields, and I am ignoring unknown fields.

Best,
Tobi

On Wed, Feb 13, 2019 at 5:03 PM Jeff Klukas <[email protected]> wrote:

> To question 1, I also would have expected the pipeline to fail in the case
> of files failing to load; I'm not sure why it's not. I thought the BigQuery
> API returns a 400 level response code in the case of files failing and that
> would bubble up to a pipeline execution error, but I haven't dug through
> the code to verify that.
>
> As to question 2, the lack of a native map type in BigQuery means you're
> in a bit of a tough spot. BigQuery loads from Avro handle this by modeling
> the map type as an array of (key, value) structs [0]. You could modify your
> payload to match that same sort of structure, which would transparently
> handle future additions of new keys, but may not be as convenient for
> querying.
>
> Otherwise, if you want to model the "log" structure as a struct in
> BigQuery, I think you'd need to provide the schemas as a side input to
> BigQueryIO. Perhaps you could have a branch of your pipeline look for new
> keys in the payload and output a view of the schema.
>
> Another option would be to set ignoreUnknownValues() which would drop
> values for any fields not existing in the BQ table. Dropping those values
> may or may not be acceptable for your use case.
>
> [0]
> https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types
>
> On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias <[email protected]>
> wrote:
>
>> Hello,
>>
>> I have a BigQuery table which ingests a Protobuf stream from Kafka with a
>> Beam pipeline. The Protobuf has a `log Map<String,String>` column which
>> translates to a field "log" of type RECORD with unknown fields in BigQuery.
>>
>> So I scanned my whole stream to know which schema fields to expect and
>> created an empty daily-partitioned table in BigQuery with correct fields
>> for this RECORD type.
>>
>> Now someone is pushing a new field into this RECORD and the import fails
>> as the schema is not matching anymore. My pipeline is configured to use
>> FILE_LOADS and so these do fail - but the Flink pipeline just continues and
>> does not throw any error.
>>
>> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions
>> are:
>>
>> 1. How can I make the pipeline fail hard in this situation?
>> 2. How can I prevent this from happening? If I create the table with the
>> correct schema in the beginning the table schema seems to be overwritten
>> and autodetected again (without the added field). This is causing the load
>> to fail. If I alter the schema and add the field manually when the pipeline
>> is running it fixes it, but then I already have a dozen of failed imports.
>> The main issue seems to be that I have a Protobuf schema which only
>> defines a Map<String,String> type, that is not specific enough to create
>> the full matching schema from it.
>>
>>
>>

Reply via email to