Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778

On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> For now I suggest that you augment your AvroIO.parse() with
> .withHintMatchesManyFiles() because it appears to match a very large number
> of tiny files, and I think that's what's causing the issue.
>
> By default Dataflow uses 1 shard per file, and that causes 2 problems here:
> - Each of these shards writes a separate file to be loaded into BigQuery,
> so BigQuery has to load this many (tiny) files, which is not great.
> - What's worse, the ReifyResults step takes this list of written temporary
> files as a side input, and given Dataflow's way of materializing side
> inputs, it behaves pretty bad when the data for the side input is written
> from a very large number of shards.
>
> I'm not sure there's an easy fix to make your original code perform well
> unchanged, but .withHintMatchesManyFiles() should make it perform orders of
> magnitude better.
>
> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Thank you - I was wrong, it is indeed not blocked by BigQuery jobs, but
>> by something it shouldn't be doing at all. This is definitely a bug. I'll
>> investigate in more detail and file a JIRA so you can track the resolution.
>>
>> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones <andrew+b...@andrew-jones.com>
>> wrote:
>>
>>> Thanks for the reply. In my case the time is spent *before* the load
>>> job has started. See attached for a screenshot of a currently running job
>>> (id: 2018-03-05_04_06_20-5803269526385225708).
>>>
>>> It looks like the time is spent in the ReifyResults step. Looking at the
>>> code and at some smaller, succeeding jobs, the BigQuery loads normally
>>> happen in SinglePartitionWriteTables (and
>>> presumably MultiPartitionsWriteTables, if required). So I'm not seeing any
>>> log lines with output from the BigQuery API, nor anything on the BigQuery
>>> side.
>>>
>>> The input to ReifyResults is around ~200K elements,
>>> from WriteBundlesToFiles. Looking at the code, I think these are all the
>>> files staged and ready for loading. I'm finding it hard to work out exactly
>>> what ReifyResults is supposed to be doing and why it would take any time at
>>> all. I think it might be going through these 200K files and doing something
>>> with them, which if it's doing it one at a time and if the calls to the GCS
>>> API is expensive then it could be the issue?
>>>
>>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>>>
>>> BigQueryIO.write() works by: 1) having Dataflow workers write data to
>>> files (in parallel) 2) asking BigQuery to load those files - naturally,
>>> during this time Dataflow workers aren't doing anything, that's why the job
>>> is scaling down.
>>> These jobs are spending time waiting for BigQuery to load the data.
>>>
>>> As for why it's taking so long for BigQuery to load the data: You can
>>> try to look for BigQuery job ids in the Stackdriver logs, and then inspect
>>> these jobs in the BigQuery UI. If it's taking *really* long, it's usually a
>>> quota issue: i.e. your BigQuery jobs are waiting for some other BigQuery
>>> jobs to complete before even starting.
>>>
>>> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones <
>>> andrew+b...@andrew-jones.com> wrote:
>>>
>>> Hi,
>>>
>>> We have a Dataflow job that loads data from GCS, does a bit of
>>> transformation, then writes to a number of BigQuery tables using
>>> DynamicDestinations.
>>>
>>> The same job runs on smaller data sets (~70 million records), but this
>>> one is struggling when processing ~500 million records. Both jobs are
>>> writing to the same amount of tables - the only difference is the amount of
>>> records.
>>>
>>> Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and
>>> 2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to
>>> write to BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the
>>> default for a bounded job). They successfully stage all their data to GCS,
>>> but then for some reason scale down the amount of workers to 1 when
>>> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and stay in
>>> that step for hours.
>>>
>>> In the logs we see many entries like this:
>>>
>>> Proposing dynamic split of work unit
>>> ...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at
>>> {"fractionConsumed":0.5}
>>> Rejecting split request because custom reader returned null residual
>>> source.
>>> And also occasionally this:
>>>
>>> Processing lull for PT24900.038S in state process of
>>> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at
>>> java.net.SocketInputStream.socketRead0(Native Method) at
>>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at
>>> java.net.SocketInputStream.read(SocketInputStream.java:170) at
>>> java.net.SocketInputStream.read(SocketInputStream.java:141) ...
>>>
>>> The job does seem to eventually progress, but after many hours. It then
>>> fails later with this error, which may or may not be related (just starting
>>> to look in to):
>>>
>>> (94794e1a2c96f380): java.lang.RuntimeException:
>>> org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to
>>> patch table description: {datasetId=..., projectId=...,
>>> tableId=9c20908cc6e549b4a1e116af54bb8128_011249028ddcc5204885bff04ce2a725_00001_00000},
>>> aborting after 9 retries.
>>>
>>> We're not sure how to proceed, so any pointers would be appreciated.
>>>
>>> Thanks,
>>> Andrew
>>>
>>>
>>>

Reply via email to