It went from about 13 hours 45 mins (if it succeeded - 2018-03-03_16_00_20- 15816431303287121211) to 1 hour 15 mins (2018-03-06_03_10_41- 4393426098762549646)! Interesting to me was the change in inputs to WriteTOBigQuery/BatchLoads/FlattenFiles, immediately before ReifyResults. Previously WriteTOBigQuery/BatchLoads/WriteBundlesToFiles.out0 was 180,442, and WriteTOBigQuery/BatchLoads/WriteGroupedRecords.out0 was 0. Now it's 7,832 and 271.
On Tue, 6 Mar 2018, at 19:26, Eugene Kirpichov wrote: > Thanks, I'm glad it worked so well! I'm curious, just how much > faster did it get? Do you have a job ID with the new code I can take > a peek at?> > On Tue, Mar 6, 2018 at 4:45 AM Andrew Jones <andrew+beam@andrew- > jones.com[1]> wrote:>> __ >> Thanks Eugene. >> >> As you suggested, using withHintMatchesManyFiles() did result in a >> very significant performance increase! Enough that it's fast enough >> for our current use case.>> >> Will track the JIRA for any further fixes. >> >> Thanks, >> Andrew >> >> On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote: >>> Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778 >>> >>> On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov >>> <kirpic...@google.com> wrote:>>>> For now I suggest that you augment your >>> AvroIO.parse() with >>>> .withHintMatchesManyFiles() because it appears to match a very >>>> large number of tiny files, and I think that's what's causing the >>>> issue.>>>> >>>> By default Dataflow uses 1 shard per file, and that causes 2 >>>> problems here:>>>> - Each of these shards writes a separate file to be >>>> loaded into >>>> BigQuery, so BigQuery has to load this many (tiny) files, which >>>> is not great.>>>> - What's worse, the ReifyResults step takes this list >>>> of written >>>> temporary files as a side input, and given Dataflow's way of >>>> materializing side inputs, it behaves pretty bad when the data >>>> for the side input is written from a very large number of shards.>>>> >>>> I'm not sure there's an easy fix to make your original code perform >>>> well unchanged, but .withHintMatchesManyFiles() should make it >>>> perform orders of magnitude better.>>>> >>>> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov >>>> <kirpic...@google.com> wrote:>>>>> Thank you - I was wrong, it is indeed >>>> not blocked by BigQuery >>>>> jobs, but by something it shouldn't be doing at all. This is >>>>> definitely a bug. I'll investigate in more detail and file a JIRA >>>>> so you can track the resolution.>>>>> >>>>> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones <andrew+beam@andrew- >>>>> jones.com[2]> wrote:>>>>>> __ >>>>>> Thanks for the reply. In my case the time is spent *before* the >>>>>> load job has started. See attached for a screenshot of a >>>>>> currently running job (id: 2018-03-05_04_06_20- >>>>>> 5803269526385225708).>>>>>> >>>>>> It looks like the time is spent in the ReifyResults step. Looking >>>>>> at the code and at some smaller, succeeding jobs, the BigQuery >>>>>> loads normally happen in SinglePartitionWriteTables (and >>>>>> presumably MultiPartitionsWriteTables, if required). So I'm not >>>>>> seeing any log lines with output from the BigQuery API, nor >>>>>> anything on the BigQuery side.>>>>>> >>>>>> The input to ReifyResults is around ~200K elements, from >>>>>> WriteBundlesToFiles. Looking at the code, I think these are all >>>>>> the files staged and ready for loading. I'm finding it hard to >>>>>> work out exactly what ReifyResults is supposed to be doing and >>>>>> why it would take any time at all. I think it might be going >>>>>> through these 200K files and doing something with them, which if >>>>>> it's doing it one at a time and if the calls to the GCS API is >>>>>> expensive then it could be the issue?>>>>>> >>>>>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote: >>>>>>> BigQueryIO.write() works by: 1) having Dataflow workers write >>>>>>> data to files (in parallel) 2) asking BigQuery to load those >>>>>>> files - naturally, during this time Dataflow workers aren't >>>>>>> doing anything, that's why the job is scaling down.>>>>>>> These jobs >>>>>>> are spending time waiting for BigQuery to load the >>>>>>> data.>>>>>>> >>>>>>> As for why it's taking so long for BigQuery to load the data: >>>>>>> You can try to look for BigQuery job ids in the Stackdriver >>>>>>> logs, and then inspect these jobs in the BigQuery UI. If it's >>>>>>> taking *really* long, it's usually a quota issue: i.e. your >>>>>>> BigQuery jobs are waiting for some other BigQuery jobs to >>>>>>> complete before even starting.>>>>>>> >>>>>>> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones <andrew+beam@andrew- >>>>>>> jones.com[3]> wrote:>>>>>>>> Hi, >>>>>>>> >>>>>>>> We have a Dataflow job that loads data from GCS, does a bit of >>>>>>>> transformation, then writes to a number of BigQuery tables >>>>>>>> using DynamicDestinations.>>>>>>>> >>>>>>>> The same job runs on smaller data sets (~70 million records), >>>>>>>> but this one is struggling when processing ~500 million >>>>>>>> records. Both jobs are writing to the same amount of tables - >>>>>>>> the only difference is the amount of records.>>>>>>>> >>>>>>>> Example job IDs include 2018-03-02_04_29_44-2181786949469858712 >>>>>>>> and 2018-03-02_08_46_28-4580218739500768796. They are using >>>>>>>> BigQuery.IO to write to BigQuery, using the >>>>>>>> BigQueryIO.Write.Method.FILE_LOADS method (the default for a >>>>>>>> bounded job). They successfully stage all their data to GCS, >>>>>>>> but then for some reason scale down the amount of workers to 1 >>>>>>>> when processing the step >>>>>>>> WriteTOBigQuery/BatchLoads/ReifyResults and stay in that step >>>>>>>> for hours.>>>>>>>> >>>>>>>> In the logs we see many entries like this: >>>>>>>> >>>>>>>> Proposing dynamic split of work unit ...-7e07;2018-03-02_04_29_44- >>>>>>>> 2181786949469858712;662185752552586455 at >>>>>>>> {"fractionConsumed":0.5}>>>>>>>> Rejecting split request because >>>>>>>> custom reader returned null >>>>>>>> residual source.>>>>>>>> And also occasionally this: >>>>>>>> >>>>>>>> Processing lull for PT24900.038S in state process of >>>>>>>> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at >>>>>>>> java.net.SocketInputStream.socketRead0(Native Method) at java.- >>>>>>>> net.SocketInputStream.socketRead(SocketInputStream.java:116) at >>>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:170) at >>>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:141) >>>>>>>> ...>>>>>>>> >>>>>>>> The job does seem to eventually progress, but after many hours. >>>>>>>> It then fails later with this error, which may or may not be >>>>>>>> related (just starting to look in to):>>>>>>>> >>>>>>>> (94794e1a2c96f380): java.lang.RuntimeException: >>>>>>>> org.apache.beam.sdk.util.UserCodeException: >>>>>>>> java.io.IOException: Unable to patch table description: >>>>>>>> {datasetId=..., projectId=..., tableId=9c20908cc6e549b4a1e116a- >>>>>>>> f54bb8128_011249028ddcc5204885bff04ce2a725_00001_00000}, >>>>>>>> aborting after 9 retries.>>>>>>>> >>>>>>>> We're not sure how to proceed, so any pointers would be >>>>>>>> appreciated.>>>>>>>> >>>>>>>> Thanks, >>>>>>>> Andrew >>>>>> >> Links: 1. mailto:andrew%2bb...@andrew-jones.com 2. mailto:andrew%2bb...@andrew-jones.com 3. mailto:andrew%2bb...@andrew-jones.com