It went from about 13 hours 45 mins (if it succeeded - 2018-03-03_16_00_20-
15816431303287121211) to 1 hour 15 mins (2018-03-06_03_10_41-
4393426098762549646)!
Interesting to me was the change in inputs to
WriteTOBigQuery/BatchLoads/FlattenFiles, immediately before
ReifyResults. Previously
WriteTOBigQuery/BatchLoads/WriteBundlesToFiles.out0 was 180,442, and
WriteTOBigQuery/BatchLoads/WriteGroupedRecords.out0 was 0. Now it's
7,832 and 271.

On Tue, 6 Mar 2018, at 19:26, Eugene Kirpichov wrote:
> Thanks, I'm glad it worked so well! I'm curious, just how much
> faster did it get? Do you have a job ID with the new code I can take
> a peek at?> 
> On Tue, Mar 6, 2018 at 4:45 AM Andrew Jones <andrew+beam@andrew-
> jones.com[1]> wrote:>> __
>> Thanks Eugene.
>> 
>> As you suggested, using withHintMatchesManyFiles() did result in a
>> very significant performance increase! Enough that it's fast enough
>> for our current use case.>> 
>> Will track the JIRA for any further fixes.
>> 
>> Thanks,
>> Andrew
>> 
>> On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote:
>>> Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778 
>>> 
>>> On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov
>>> <kirpic...@google.com> wrote:>>>> For now I suggest that you augment your 
>>> AvroIO.parse() with
>>>> .withHintMatchesManyFiles() because it appears to match a very
>>>> large number of tiny files, and I think that's what's causing the
>>>> issue.>>>> 
>>>> By default Dataflow uses 1 shard per file, and that causes 2
>>>> problems here:>>>> - Each of these shards writes a separate file to be 
>>>> loaded into
>>>>   BigQuery, so BigQuery has to load this many (tiny) files, which
>>>>   is not great.>>>> - What's worse, the ReifyResults step takes this list 
>>>> of written
>>>>   temporary files as a side input, and given Dataflow's way of
>>>>   materializing side inputs, it behaves pretty bad when the data
>>>>   for the side input is written from a very large number of shards.>>>> 
>>>> I'm not sure there's an easy fix to make your original code perform
>>>> well unchanged, but .withHintMatchesManyFiles() should make it
>>>> perform orders of magnitude better.>>>> 
>>>> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov
>>>> <kirpic...@google.com> wrote:>>>>> Thank you - I was wrong, it is indeed 
>>>> not blocked by BigQuery
>>>>> jobs, but by something it shouldn't be doing at all. This is
>>>>> definitely a bug. I'll investigate in more detail and file a JIRA
>>>>> so you can track the resolution.>>>>> 
>>>>> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones <andrew+beam@andrew-
>>>>> jones.com[2]> wrote:>>>>>> __
>>>>>> Thanks for the reply. In my case the time is spent *before* the
>>>>>> load job has started. See attached for a screenshot of a
>>>>>> currently running job (id: 2018-03-05_04_06_20-
>>>>>> 5803269526385225708).>>>>>> 
>>>>>> It looks like the time is spent in the ReifyResults step. Looking
>>>>>> at the code and at some smaller, succeeding jobs, the BigQuery
>>>>>> loads normally happen in SinglePartitionWriteTables (and
>>>>>> presumably MultiPartitionsWriteTables, if required). So I'm not
>>>>>> seeing any log lines with output from the BigQuery API, nor
>>>>>> anything on the BigQuery side.>>>>>> 
>>>>>> The input to ReifyResults is around ~200K elements, from
>>>>>> WriteBundlesToFiles. Looking at the code, I think these are all
>>>>>> the files staged and ready for loading. I'm finding it hard to
>>>>>> work out exactly what ReifyResults is supposed to be doing and
>>>>>> why it would take any time at all. I think it might be going
>>>>>> through these 200K files and doing something with them, which if
>>>>>> it's doing it one at a time and if the calls to the GCS API is
>>>>>> expensive then it could be the issue?>>>>>> 
>>>>>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>>>>>>> BigQueryIO.write() works by: 1) having Dataflow workers write
>>>>>>> data to files (in parallel) 2) asking BigQuery to load those
>>>>>>> files - naturally, during this time Dataflow workers aren't
>>>>>>> doing anything, that's why the job is scaling down.>>>>>>> These jobs 
>>>>>>> are spending time waiting for BigQuery to load the
>>>>>>> data.>>>>>>> 
>>>>>>> As for why it's taking so long for BigQuery to load the data:
>>>>>>> You can try to look for BigQuery job ids in the Stackdriver
>>>>>>> logs, and then inspect these jobs in the BigQuery UI. If it's
>>>>>>> taking *really* long, it's usually a quota issue: i.e. your
>>>>>>> BigQuery jobs are waiting for some other BigQuery jobs to
>>>>>>> complete before even starting.>>>>>>> 
>>>>>>> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones <andrew+beam@andrew-
>>>>>>> jones.com[3]> wrote:>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> We have a Dataflow job that loads data from GCS, does a bit of
>>>>>>>> transformation, then writes to a number of BigQuery tables
>>>>>>>> using DynamicDestinations.>>>>>>>> 
>>>>>>>> The same job runs on smaller data sets (~70 million records),
>>>>>>>> but this one is struggling when processing ~500 million
>>>>>>>> records. Both jobs are writing to the same amount of tables -
>>>>>>>> the only difference is the amount of records.>>>>>>>> 
>>>>>>>> Example job IDs include 2018-03-02_04_29_44-2181786949469858712
>>>>>>>> and 2018-03-02_08_46_28-4580218739500768796. They are using
>>>>>>>> BigQuery.IO to write to BigQuery, using the
>>>>>>>> BigQueryIO.Write.Method.FILE_LOADS method (the default for a
>>>>>>>> bounded job). They successfully stage all their data to GCS,
>>>>>>>> but then for some reason scale down the amount of workers to 1
>>>>>>>> when processing the step
>>>>>>>> WriteTOBigQuery/BatchLoads/ReifyResults and stay in that step
>>>>>>>> for hours.>>>>>>>> 
>>>>>>>> In the logs we see many entries like this:
>>>>>>>> 
>>>>>>>> Proposing dynamic split of work unit ...-7e07;2018-03-02_04_29_44-
>>>>>>>> 2181786949469858712;662185752552586455 at
>>>>>>>> {"fractionConsumed":0.5}>>>>>>>> Rejecting split request because 
>>>>>>>> custom reader returned null
>>>>>>>> residual source.>>>>>>>> And also occasionally this:
>>>>>>>> 
>>>>>>>> Processing lull for PT24900.038S in state process of
>>>>>>>> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at
>>>>>>>> java.net.SocketInputStream.socketRead0(Native Method) at java.-
>>>>>>>> net.SocketInputStream.socketRead(SocketInputStream.java:116) at
>>>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:170) at
>>>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:141) 
>>>>>>>> ...>>>>>>>> 
>>>>>>>> The job does seem to eventually progress, but after many hours.
>>>>>>>> It then fails later with this error, which may or may not be
>>>>>>>> related (just starting to look in to):>>>>>>>> 
>>>>>>>> (94794e1a2c96f380): java.lang.RuntimeException:
>>>>>>>> org.apache.beam.sdk.util.UserCodeException:
>>>>>>>> java.io.IOException: Unable to patch table description:
>>>>>>>> {datasetId=..., projectId=..., tableId=9c20908cc6e549b4a1e116a-
>>>>>>>> f54bb8128_011249028ddcc5204885bff04ce2a725_00001_00000},
>>>>>>>> aborting after 9 retries.>>>>>>>> 
>>>>>>>> We're not sure how to proceed, so any pointers would be
>>>>>>>> appreciated.>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>> 
>> 


Links:

  1. mailto:andrew%2bb...@andrew-jones.com
  2. mailto:andrew%2bb...@andrew-jones.com
  3. mailto:andrew%2bb...@andrew-jones.com

Reply via email to