Thanks, I'm glad it worked so well! I'm curious, just how much faster did
it get? Do you have a job ID with the new code I can take a peek at?

On Tue, Mar 6, 2018 at 4:45 AM Andrew Jones <andrew+b...@andrew-jones.com>
wrote:

> Thanks Eugene.
>
> As you suggested, using withHintMatchesManyFiles() did result in a very
> significant performance increase! Enough that it's fast enough for our
> current use case.
>
> Will track the JIRA for any further fixes.
>
> Thanks,
> Andrew
>
> On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote:
>
> Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778
>
> On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
> For now I suggest that you augment your AvroIO.parse() with
> .withHintMatchesManyFiles() because it appears to match a very large number
> of tiny files, and I think that's what's causing the issue.
>
> By default Dataflow uses 1 shard per file, and that causes 2 problems here:
> - Each of these shards writes a separate file to be loaded into BigQuery,
> so BigQuery has to load this many (tiny) files, which is not great.
> - What's worse, the ReifyResults step takes this list of written temporary
> files as a side input, and given Dataflow's way of materializing side
> inputs, it behaves pretty bad when the data for the side input is written
> from a very large number of shards.
>
> I'm not sure there's an easy fix to make your original code perform well
> unchanged, but .withHintMatchesManyFiles() should make it perform orders of
> magnitude better.
>
> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
> Thank you - I was wrong, it is indeed not blocked by BigQuery jobs, but by
> something it shouldn't be doing at all. This is definitely a bug. I'll
> investigate in more detail and file a JIRA so you can track the resolution.
>
> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones <andrew+b...@andrew-jones.com>
> wrote:
>
>
> Thanks for the reply. In my case the time is spent *before* the load job
> has started. See attached for a screenshot of a currently running job (id:
> 2018-03-05_04_06_20-5803269526385225708).
>
> It looks like the time is spent in the ReifyResults step. Looking at the
> code and at some smaller, succeeding jobs, the BigQuery loads normally
> happen in SinglePartitionWriteTables (and
> presumably MultiPartitionsWriteTables, if required). So I'm not seeing any
> log lines with output from the BigQuery API, nor anything on the BigQuery
> side.
>
> The input to ReifyResults is around ~200K elements,
> from WriteBundlesToFiles. Looking at the code, I think these are all the
> files staged and ready for loading. I'm finding it hard to work out exactly
> what ReifyResults is supposed to be doing and why it would take any time at
> all. I think it might be going through these 200K files and doing something
> with them, which if it's doing it one at a time and if the calls to the GCS
> API is expensive then it could be the issue?
>
> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>
> BigQueryIO.write() works by: 1) having Dataflow workers write data to
> files (in parallel) 2) asking BigQuery to load those files - naturally,
> during this time Dataflow workers aren't doing anything, that's why the job
> is scaling down.
> These jobs are spending time waiting for BigQuery to load the data.
>
> As for why it's taking so long for BigQuery to load the data: You can try
> to look for BigQuery job ids in the Stackdriver logs, and then inspect
> these jobs in the BigQuery UI. If it's taking *really* long, it's usually a
> quota issue: i.e. your BigQuery jobs are waiting for some other BigQuery
> jobs to complete before even starting.
>
> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones <andrew+b...@andrew-jones.com>
> wrote:
>
> Hi,
>
> We have a Dataflow job that loads data from GCS, does a bit of
> transformation, then writes to a number of BigQuery tables using
> DynamicDestinations.
>
> The same job runs on smaller data sets (~70 million records), but this one
> is struggling when processing ~500 million records. Both jobs are writing
> to the same amount of tables - the only difference is the amount of records.
>
> Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and
> 2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to
> write to BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the
> default for a bounded job). They successfully stage all their data to GCS,
> but then for some reason scale down the amount of workers to 1 when
> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and stay in
> that step for hours.
>
> In the logs we see many entries like this:
>
> Proposing dynamic split of work unit
> ...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at
> {"fractionConsumed":0.5}
> Rejecting split request because custom reader returned null residual
> source.
> And also occasionally this:
>
> Processing lull for PT24900.038S in state process of
> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at
> java.net.SocketInputStream.socketRead0(Native Method) at
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at
> java.net.SocketInputStream.read(SocketInputStream.java:170) at
> java.net.SocketInputStream.read(SocketInputStream.java:141) ...
>
> The job does seem to eventually progress, but after many hours. It then
> fails later with this error, which may or may not be related (just starting
> to look in to):
>
> (94794e1a2c96f380): java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to
> patch table description: {datasetId=..., projectId=...,
> tableId=9c20908cc6e549b4a1e116af54bb8128_011249028ddcc5204885bff04ce2a725_00001_00000},
> aborting after 9 retries.
>
> We're not sure how to proceed, so any pointers would be appreciated.
>
> Thanks,
> Andrew
>
>
>
>

Reply via email to