Nice, that's quite a speedup! I looked at the new job and I see no obvious bottlenecks in it anymore - it pretty much keeps all workers busy all the time with useful work - as expected :)
On Wed, Mar 7, 2018 at 2:30 AM Andrew Jones <andrew+b...@andrew-jones.com> wrote: > It went from about 13 hours 45 mins (if it succeeded - > 2018-03-03_16_00_20-15816431303287121211) to 1 hour 15 mins > (2018-03-06_03_10_41-4393426098762549646)! > > Interesting to me was the change in inputs > to WriteTOBigQuery/BatchLoads/FlattenFiles, immediately before > ReifyResults. > Previously WriteTOBigQuery/BatchLoads/WriteBundlesToFiles.out0 was 180,442, > and WriteTOBigQuery/BatchLoads/WriteGroupedRecords.out0 was 0. Now > it's 7,832 and 271. > > > On Tue, 6 Mar 2018, at 19:26, Eugene Kirpichov wrote: > > Thanks, I'm glad it worked so well! I'm curious, just how much faster did > it get? Do you have a job ID with the new code I can take a peek at? > > On Tue, Mar 6, 2018 at 4:45 AM Andrew Jones <andrew+b...@andrew-jones.com> > wrote: > > > Thanks Eugene. > > As you suggested, using withHintMatchesManyFiles() did result in a very > significant performance increase! Enough that it's fast enough for our > current use case. > > Will track the JIRA for any further fixes. > > Thanks, > Andrew > > On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote: > > Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778 > > On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > > For now I suggest that you augment your AvroIO.parse() with > .withHintMatchesManyFiles() because it appears to match a very large number > of tiny files, and I think that's what's causing the issue. > > By default Dataflow uses 1 shard per file, and that causes 2 problems here: > - Each of these shards writes a separate file to be loaded into BigQuery, > so BigQuery has to load this many (tiny) files, which is not great. > - What's worse, the ReifyResults step takes this list of written temporary > files as a side input, and given Dataflow's way of materializing side > inputs, it behaves pretty bad when the data for the side input is written > from a very large number of shards. > > I'm not sure there's an easy fix to make your original code perform well > unchanged, but .withHintMatchesManyFiles() should make it perform orders of > magnitude better. > > On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > > Thank you - I was wrong, it is indeed not blocked by BigQuery jobs, but by > something it shouldn't be doing at all. This is definitely a bug. I'll > investigate in more detail and file a JIRA so you can track the resolution. > > On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones <andrew+b...@andrew-jones.com> > wrote: > > > Thanks for the reply. In my case the time is spent *before* the load job > has started. See attached for a screenshot of a currently running job (id: > 2018-03-05_04_06_20-5803269526385225708). > > It looks like the time is spent in the ReifyResults step. Looking at the > code and at some smaller, succeeding jobs, the BigQuery loads normally > happen in SinglePartitionWriteTables (and > presumably MultiPartitionsWriteTables, if required). So I'm not seeing any > log lines with output from the BigQuery API, nor anything on the BigQuery > side. > > The input to ReifyResults is around ~200K elements, > from WriteBundlesToFiles. Looking at the code, I think these are all the > files staged and ready for loading. I'm finding it hard to work out exactly > what ReifyResults is supposed to be doing and why it would take any time at > all. I think it might be going through these 200K files and doing something > with them, which if it's doing it one at a time and if the calls to the GCS > API is expensive then it could be the issue? > > On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote: > > BigQueryIO.write() works by: 1) having Dataflow workers write data to > files (in parallel) 2) asking BigQuery to load those files - naturally, > during this time Dataflow workers aren't doing anything, that's why the job > is scaling down. > These jobs are spending time waiting for BigQuery to load the data. > > As for why it's taking so long for BigQuery to load the data: You can try > to look for BigQuery job ids in the Stackdriver logs, and then inspect > these jobs in the BigQuery UI. If it's taking *really* long, it's usually a > quota issue: i.e. your BigQuery jobs are waiting for some other BigQuery > jobs to complete before even starting. > > On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones <andrew+b...@andrew-jones.com> > wrote: > > Hi, > > We have a Dataflow job that loads data from GCS, does a bit of > transformation, then writes to a number of BigQuery tables using > DynamicDestinations. > > The same job runs on smaller data sets (~70 million records), but this one > is struggling when processing ~500 million records. Both jobs are writing > to the same amount of tables - the only difference is the amount of records. > > Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and > 2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to > write to BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the > default for a bounded job). They successfully stage all their data to GCS, > but then for some reason scale down the amount of workers to 1 when > processing the step WriteTOBigQuery/BatchLoads/ReifyResults and stay in > that step for hours. > > In the logs we see many entries like this: > > Proposing dynamic split of work unit > ...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at > {"fractionConsumed":0.5} > Rejecting split request because custom reader returned null residual > source. > And also occasionally this: > > Processing lull for PT24900.038S in state process of > WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at > java.net.SocketInputStream.socketRead0(Native Method) at > java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at > java.net.SocketInputStream.read(SocketInputStream.java:170) at > java.net.SocketInputStream.read(SocketInputStream.java:141) ... > > The job does seem to eventually progress, but after many hours. It then > fails later with this error, which may or may not be related (just starting > to look in to): > > (94794e1a2c96f380): java.lang.RuntimeException: > org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to > patch table description: {datasetId=..., projectId=..., > tableId=9c20908cc6e549b4a1e116af54bb8128_011249028ddcc5204885bff04ce2a725_00001_00000}, > aborting after 9 retries. > > We're not sure how to proceed, so any pointers would be appreciated. > > Thanks, > Andrew > > > > >