Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778
On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com> wrote: > For now I suggest that you augment your AvroIO.parse() with > .withHintMatchesManyFiles() because it appears to match a very large number > of tiny files, and I think that's what's causing the issue. > > By default Dataflow uses 1 shard per file, and that causes 2 problems here: > - Each of these shards writes a separate file to be loaded into BigQuery, > so BigQuery has to load this many (tiny) files, which is not great. > - What's worse, the ReifyResults step takes this list of written temporary > files as a side input, and given Dataflow's way of materializing side > inputs, it behaves pretty bad when the data for the side input is written > from a very large number of shards. > > I'm not sure there's an easy fix to make your original code perform well > unchanged, but .withHintMatchesManyFiles() should make it perform orders of > magnitude better. > > On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Thank you - I was wrong, it is indeed not blocked by BigQuery jobs, but >> by something it shouldn't be doing at all. This is definitely a bug. I'll >> investigate in more detail and file a JIRA so you can track the resolution. >> >> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones <andrew+b...@andrew-jones.com> >> wrote: >> >>> Thanks for the reply. In my case the time is spent *before* the load >>> job has started. See attached for a screenshot of a currently running job >>> (id: 2018-03-05_04_06_20-5803269526385225708). >>> >>> It looks like the time is spent in the ReifyResults step. Looking at the >>> code and at some smaller, succeeding jobs, the BigQuery loads normally >>> happen in SinglePartitionWriteTables (and >>> presumably MultiPartitionsWriteTables, if required). So I'm not seeing any >>> log lines with output from the BigQuery API, nor anything on the BigQuery >>> side. >>> >>> The input to ReifyResults is around ~200K elements, >>> from WriteBundlesToFiles. Looking at the code, I think these are all the >>> files staged and ready for loading. I'm finding it hard to work out exactly >>> what ReifyResults is supposed to be doing and why it would take any time at >>> all. I think it might be going through these 200K files and doing something >>> with them, which if it's doing it one at a time and if the calls to the GCS >>> API is expensive then it could be the issue? >>> >>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote: >>> >>> BigQueryIO.write() works by: 1) having Dataflow workers write data to >>> files (in parallel) 2) asking BigQuery to load those files - naturally, >>> during this time Dataflow workers aren't doing anything, that's why the job >>> is scaling down. >>> These jobs are spending time waiting for BigQuery to load the data. >>> >>> As for why it's taking so long for BigQuery to load the data: You can >>> try to look for BigQuery job ids in the Stackdriver logs, and then inspect >>> these jobs in the BigQuery UI. If it's taking *really* long, it's usually a >>> quota issue: i.e. your BigQuery jobs are waiting for some other BigQuery >>> jobs to complete before even starting. >>> >>> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones < >>> andrew+b...@andrew-jones.com> wrote: >>> >>> Hi, >>> >>> We have a Dataflow job that loads data from GCS, does a bit of >>> transformation, then writes to a number of BigQuery tables using >>> DynamicDestinations. >>> >>> The same job runs on smaller data sets (~70 million records), but this >>> one is struggling when processing ~500 million records. Both jobs are >>> writing to the same amount of tables - the only difference is the amount of >>> records. >>> >>> Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and >>> 2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to >>> write to BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the >>> default for a bounded job). They successfully stage all their data to GCS, >>> but then for some reason scale down the amount of workers to 1 when >>> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and stay in >>> that step for hours. >>> >>> In the logs we see many entries like this: >>> >>> Proposing dynamic split of work unit >>> ...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at >>> {"fractionConsumed":0.5} >>> Rejecting split request because custom reader returned null residual >>> source. >>> And also occasionally this: >>> >>> Processing lull for PT24900.038S in state process of >>> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at >>> java.net.SocketInputStream.socketRead0(Native Method) at >>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at >>> java.net.SocketInputStream.read(SocketInputStream.java:170) at >>> java.net.SocketInputStream.read(SocketInputStream.java:141) ... >>> >>> The job does seem to eventually progress, but after many hours. It then >>> fails later with this error, which may or may not be related (just starting >>> to look in to): >>> >>> (94794e1a2c96f380): java.lang.RuntimeException: >>> org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to >>> patch table description: {datasetId=..., projectId=..., >>> tableId=9c20908cc6e549b4a1e116af54bb8128_011249028ddcc5204885bff04ce2a725_00001_00000}, >>> aborting after 9 retries. >>> >>> We're not sure how to proceed, so any pointers would be appreciated. >>> >>> Thanks, >>> Andrew >>> >>> >>>