Thanks Eugene.

As you suggested, using withHintMatchesManyFiles() did result in a very
significant performance increase! Enough that it's fast enough for our
current use case.
Will track the JIRA for any further fixes.


On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote:
> Filed JIRA 
> On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov
> <> wrote:>> For now I suggest that you augment your 
> AvroIO.parse() with
>> .withHintMatchesManyFiles() because it appears to match a very large
>> number of tiny files, and I think that's what's causing the issue.>> 
>> By default Dataflow uses 1 shard per file, and that causes 2
>> problems here:>> - Each of these shards writes a separate file to be loaded 
>> into
>>   BigQuery, so BigQuery has to load this many (tiny) files, which is
>>   not great.>> - What's worse, the ReifyResults step takes this list of 
>> written
>>   temporary files as a side input, and given Dataflow's way of
>>   materializing side inputs, it behaves pretty bad when the data for
>>   the side input is written from a very large number of shards.>> 
>> I'm not sure there's an easy fix to make your original code perform
>> well unchanged, but .withHintMatchesManyFiles() should make it
>> perform orders of magnitude better.>> 
>> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov
>> <> wrote:>>> Thank you - I was wrong, it is indeed not 
>> blocked by BigQuery jobs,
>>> but by something it shouldn't be doing at all. This is definitely a
>>> bug. I'll investigate in more detail and file a JIRA so you can
>>> track the resolution.>>> 
>>> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones <andrew+beam@andrew-
>>>[1]> wrote:>>>> __
>>>> Thanks for the reply. In my case the time is spent *before* the
>>>> load job has started. See attached for a screenshot of a currently
>>>> running job (id: 2018-03-05_04_06_20-5803269526385225708).>>>> 
>>>> It looks like the time is spent in the ReifyResults step. Looking
>>>> at the code and at some smaller, succeeding jobs, the BigQuery
>>>> loads normally happen in SinglePartitionWriteTables (and presumably
>>>> MultiPartitionsWriteTables, if required). So I'm not seeing any log
>>>> lines with output from the BigQuery API, nor anything on the
>>>> BigQuery side.>>>> 
>>>> The input to ReifyResults is around ~200K elements, from
>>>> WriteBundlesToFiles. Looking at the code, I think these are all the
>>>> files staged and ready for loading. I'm finding it hard to work out
>>>> exactly what ReifyResults is supposed to be doing and why it would
>>>> take any time at all. I think it might be going through these 200K
>>>> files and doing something with them, which if it's doing it one at
>>>> a time and if the calls to the GCS API is expensive then it could
>>>> be the issue?>>>> 
>>>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>>>>> BigQueryIO.write() works by: 1) having Dataflow workers write data
>>>>> to files (in parallel) 2) asking BigQuery to load those files -
>>>>> naturally, during this time Dataflow workers aren't doing
>>>>> anything, that's why the job is scaling down.>>>>> These jobs are 
>>>>> spending time waiting for BigQuery to load the
>>>>> data.>>>>> 
>>>>> As for why it's taking so long for BigQuery to load the data: You
>>>>> can try to look for BigQuery job ids in the Stackdriver logs, and
>>>>> then inspect these jobs in the BigQuery UI. If it's taking
>>>>> *really* long, it's usually a quota issue: i.e. your BigQuery jobs
>>>>> are waiting for some other BigQuery jobs to complete before even
>>>>> starting.>>>>> 
>>>>> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones <andrew+beam@andrew-
>>>>>[2]> wrote:>>>>>> Hi,
>>>>>> We have a Dataflow job that loads data from GCS, does a bit of
>>>>>> transformation, then writes to a number of BigQuery tables using
>>>>>> DynamicDestinations.>>>>>> 
>>>>>> The same job runs on smaller data sets (~70 million records), but
>>>>>> this one is struggling when processing ~500 million records. Both
>>>>>> jobs are writing to the same amount of tables - the only
>>>>>> difference is the amount of records.>>>>>> 
>>>>>> Example job IDs include 2018-03-02_04_29_44-2181786949469858712
>>>>>> and 2018-03-02_08_46_28-4580218739500768796. They are using
>>>>>> BigQuery.IO to write to BigQuery, using the
>>>>>> BigQueryIO.Write.Method.FILE_LOADS method (the default for a
>>>>>> bounded job). They successfully stage all their data to GCS, but
>>>>>> then for some reason scale down the amount of workers to 1 when
>>>>>> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and
>>>>>> stay in that step for hours.>>>>>> 
>>>>>> In the logs we see many entries like this:
>>>>>> Proposing dynamic split of work unit ...-7e07;2018-03-02_04_29_44-
>>>>>> 2181786949469858712;662185752552586455 at
>>>>>> {"fractionConsumed":0.5}>>>>>> Rejecting split request because custom 
>>>>>> reader returned null
>>>>>> residual source.>>>>>> And also occasionally this:
>>>>>> Processing lull for PT24900.038S in state process of
>>>>>> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at
>>>>>> Method) at
>>>>>> t.SocketInputStream.socketRead( at
>>>>>> at
>>>>>> ...>>>>>> 
>>>>>> The job does seem to eventually progress, but after many hours.
>>>>>> It then fails later with this error, which may or may not be
>>>>>> related (just starting to look in to):>>>>>> 
>>>>>> (94794e1a2c96f380): java.lang.RuntimeException:
>>>>>> org.apache.beam.sdk.util.UserCodeException:
>>>>>> Unable to patch table description: {datasetId=..., projectId=...,
>>>>>> tableId=9c20908cc6e549b4a1e116af54bb8128_011249028ddcc5204885bff-
>>>>>> 04ce2a725_00001_00000}, aborting after 9 retries.>>>>>> 
>>>>>> We're not sure how to proceed, so any pointers would be
>>>>>> appreciated.>>>>>> 
>>>>>> Thanks,
>>>>>> Andrew



Reply via email to