Actually the number of retries is 1000 by default, so it might have not
reached that number yet. I will have to test that again.
https://github.com/apache/beam/blob/38daf8c45b14a94665939b562ab947dc72ad8f8f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1901

On Thu, Feb 14, 2019 at 1:05 PM Kaymak, Tobias <[email protected]>
wrote:

> Thank you Jeff,
>
> I think 1 is a bug and I am planning to report it in the bugtracker
> regarding 2 I have now a fixed TableSchema supplied to this pipeline with
> the expected fields, and I am ignoring unknown fields.
>
> Best,
> Tobi
>
> On Wed, Feb 13, 2019 at 5:03 PM Jeff Klukas <[email protected]> wrote:
>
>> To question 1, I also would have expected the pipeline to fail in the
>> case of files failing to load; I'm not sure why it's not. I thought the
>> BigQuery API returns a 400 level response code in the case of files failing
>> and that would bubble up to a pipeline execution error, but I haven't dug
>> through the code to verify that.
>>
>> As to question 2, the lack of a native map type in BigQuery means you're
>> in a bit of a tough spot. BigQuery loads from Avro handle this by modeling
>> the map type as an array of (key, value) structs [0]. You could modify your
>> payload to match that same sort of structure, which would transparently
>> handle future additions of new keys, but may not be as convenient for
>> querying.
>>
>> Otherwise, if you want to model the "log" structure as a struct in
>> BigQuery, I think you'd need to provide the schemas as a side input to
>> BigQueryIO. Perhaps you could have a branch of your pipeline look for new
>> keys in the payload and output a view of the schema.
>>
>> Another option would be to set ignoreUnknownValues() which would drop
>> values for any fields not existing in the BQ table. Dropping those values
>> may or may not be acceptable for your use case.
>>
>> [0]
>> https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types
>>
>> On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias <[email protected]>
>> wrote:
>>
>>> Hello,
>>>
>>> I have a BigQuery table which ingests a Protobuf stream from Kafka with
>>> a Beam pipeline. The Protobuf has a `log Map<String,String>` column which
>>> translates to a field "log" of type RECORD with unknown fields in BigQuery.
>>>
>>> So I scanned my whole stream to know which schema fields to expect and
>>> created an empty daily-partitioned table in BigQuery with correct fields
>>> for this RECORD type.
>>>
>>> Now someone is pushing a new field into this RECORD and the import fails
>>> as the schema is not matching anymore. My pipeline is configured to use
>>> FILE_LOADS and so these do fail - but the Flink pipeline just continues and
>>> does not throw any error.
>>>
>>> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions
>>> are:
>>>
>>> 1. How can I make the pipeline fail hard in this situation?
>>> 2. How can I prevent this from happening? If I create the table with the
>>> correct schema in the beginning the table schema seems to be overwritten
>>> and autodetected again (without the added field). This is causing the load
>>> to fail. If I alter the schema and add the field manually when the pipeline
>>> is running it fixes it, but then I already have a dozen of failed imports.
>>> The main issue seems to be that I have a Protobuf schema which only
>>> defines a Map<String,String> type, that is not specific enough to create
>>> the full matching schema from it.
>>>
>>>
>>>

Reply via email to