Re: WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-07 Thread Andrew Jones
It went from about 13 hours 45 mins (if it succeeded - 2018-03-03_16_00_20-
15816431303287121211) to 1 hour 15 mins (2018-03-06_03_10_41-
4393426098762549646)!
Interesting to me was the change in inputs to
WriteTOBigQuery/BatchLoads/FlattenFiles, immediately before
ReifyResults. Previously
WriteTOBigQuery/BatchLoads/WriteBundlesToFiles.out0 was 180,442, and
WriteTOBigQuery/BatchLoads/WriteGroupedRecords.out0 was 0. Now it's
7,832 and 271.

On Tue, 6 Mar 2018, at 19:26, Eugene Kirpichov wrote:
> Thanks, I'm glad it worked so well! I'm curious, just how much
> faster did it get? Do you have a job ID with the new code I can take
> a peek at?> 
> On Tue, Mar 6, 2018 at 4:45 AM Andrew Jones  jones.com[1]> wrote:>> __
>> Thanks Eugene.
>> 
>> As you suggested, using withHintMatchesManyFiles() did result in a
>> very significant performance increase! Enough that it's fast enough
>> for our current use case.>> 
>> Will track the JIRA for any further fixes.
>> 
>> Thanks,
>> Andrew
>> 
>> On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote:
>>> Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778 
>>> 
>>> On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov
>>>  wrote: For now I suggest that you augment your 
>>> AvroIO.parse() with
 .withHintMatchesManyFiles() because it appears to match a very
 large number of tiny files, and I think that's what's causing the
 issue. 
 By default Dataflow uses 1 shard per file, and that causes 2
 problems here: - Each of these shards writes a separate file to be 
 loaded into
   BigQuery, so BigQuery has to load this many (tiny) files, which
   is not great. - What's worse, the ReifyResults step takes this list 
 of written
   temporary files as a side input, and given Dataflow's way of
   materializing side inputs, it behaves pretty bad when the data
   for the side input is written from a very large number of shards. 
 I'm not sure there's an easy fix to make your original code perform
 well unchanged, but .withHintMatchesManyFiles() should make it
 perform orders of magnitude better. 
 On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov
  wrote:> Thank you - I was wrong, it is indeed 
 not blocked by BigQuery
> jobs, but by something it shouldn't be doing at all. This is
> definitely a bug. I'll investigate in more detail and file a JIRA
> so you can track the resolution.> 
> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones  jones.com[2]> wrote:>> __
>> Thanks for the reply. In my case the time is spent *before* the
>> load job has started. See attached for a screenshot of a
>> currently running job (id: 2018-03-05_04_06_20-
>> 5803269526385225708).>> 
>> It looks like the time is spent in the ReifyResults step. Looking
>> at the code and at some smaller, succeeding jobs, the BigQuery
>> loads normally happen in SinglePartitionWriteTables (and
>> presumably MultiPartitionsWriteTables, if required). So I'm not
>> seeing any log lines with output from the BigQuery API, nor
>> anything on the BigQuery side.>> 
>> The input to ReifyResults is around ~200K elements, from
>> WriteBundlesToFiles. Looking at the code, I think these are all
>> the files staged and ready for loading. I'm finding it hard to
>> work out exactly what ReifyResults is supposed to be doing and
>> why it would take any time at all. I think it might be going
>> through these 200K files and doing something with them, which if
>> it's doing it one at a time and if the calls to the GCS API is
>> expensive then it could be the issue?>> 
>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>>> BigQueryIO.write() works by: 1) having Dataflow workers write
>>> data to files (in parallel) 2) asking BigQuery to load those
>>> files - naturally, during this time Dataflow workers aren't
>>> doing anything, that's why the job is scaling down.>>> These jobs 
>>> are spending time waiting for BigQuery to load the
>>> data.>>> 
>>> As for why it's taking so long for BigQuery to load the data:
>>> You can try to look for BigQuery job ids in the Stackdriver
>>> logs, and then inspect these jobs in the BigQuery UI. If it's
>>> taking *really* long, it's usually a quota issue: i.e. your
>>> BigQuery jobs are waiting for some other BigQuery jobs to
>>> complete before even starting.>>> 
>>> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones >> jones.com[3]> wrote: Hi,
 
 We have a Dataflow job that loads data from GCS, does a bit of
 transformation, then writes to a number of BigQuery tables
 using DynamicDestinations. 
 The same job runs on smaller data sets (~70 million records),
 but 

Re: WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-06 Thread Eugene Kirpichov
Thanks, I'm glad it worked so well! I'm curious, just how much faster did
it get? Do you have a job ID with the new code I can take a peek at?

On Tue, Mar 6, 2018 at 4:45 AM Andrew Jones 
wrote:

> Thanks Eugene.
>
> As you suggested, using withHintMatchesManyFiles() did result in a very
> significant performance increase! Enough that it's fast enough for our
> current use case.
>
> Will track the JIRA for any further fixes.
>
> Thanks,
> Andrew
>
> On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote:
>
> Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778
>
> On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov 
> wrote:
>
> For now I suggest that you augment your AvroIO.parse() with
> .withHintMatchesManyFiles() because it appears to match a very large number
> of tiny files, and I think that's what's causing the issue.
>
> By default Dataflow uses 1 shard per file, and that causes 2 problems here:
> - Each of these shards writes a separate file to be loaded into BigQuery,
> so BigQuery has to load this many (tiny) files, which is not great.
> - What's worse, the ReifyResults step takes this list of written temporary
> files as a side input, and given Dataflow's way of materializing side
> inputs, it behaves pretty bad when the data for the side input is written
> from a very large number of shards.
>
> I'm not sure there's an easy fix to make your original code perform well
> unchanged, but .withHintMatchesManyFiles() should make it perform orders of
> magnitude better.
>
> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov 
> wrote:
>
> Thank you - I was wrong, it is indeed not blocked by BigQuery jobs, but by
> something it shouldn't be doing at all. This is definitely a bug. I'll
> investigate in more detail and file a JIRA so you can track the resolution.
>
> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones 
> wrote:
>
>
> Thanks for the reply. In my case the time is spent *before* the load job
> has started. See attached for a screenshot of a currently running job (id:
> 2018-03-05_04_06_20-5803269526385225708).
>
> It looks like the time is spent in the ReifyResults step. Looking at the
> code and at some smaller, succeeding jobs, the BigQuery loads normally
> happen in SinglePartitionWriteTables (and
> presumably MultiPartitionsWriteTables, if required). So I'm not seeing any
> log lines with output from the BigQuery API, nor anything on the BigQuery
> side.
>
> The input to ReifyResults is around ~200K elements,
> from WriteBundlesToFiles. Looking at the code, I think these are all the
> files staged and ready for loading. I'm finding it hard to work out exactly
> what ReifyResults is supposed to be doing and why it would take any time at
> all. I think it might be going through these 200K files and doing something
> with them, which if it's doing it one at a time and if the calls to the GCS
> API is expensive then it could be the issue?
>
> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>
> BigQueryIO.write() works by: 1) having Dataflow workers write data to
> files (in parallel) 2) asking BigQuery to load those files - naturally,
> during this time Dataflow workers aren't doing anything, that's why the job
> is scaling down.
> These jobs are spending time waiting for BigQuery to load the data.
>
> As for why it's taking so long for BigQuery to load the data: You can try
> to look for BigQuery job ids in the Stackdriver logs, and then inspect
> these jobs in the BigQuery UI. If it's taking *really* long, it's usually a
> quota issue: i.e. your BigQuery jobs are waiting for some other BigQuery
> jobs to complete before even starting.
>
> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones 
> wrote:
>
> Hi,
>
> We have a Dataflow job that loads data from GCS, does a bit of
> transformation, then writes to a number of BigQuery tables using
> DynamicDestinations.
>
> The same job runs on smaller data sets (~70 million records), but this one
> is struggling when processing ~500 million records. Both jobs are writing
> to the same amount of tables - the only difference is the amount of records.
>
> Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and
> 2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to
> write to BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the
> default for a bounded job). They successfully stage all their data to GCS,
> but then for some reason scale down the amount of workers to 1 when
> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and stay in
> that step for hours.
>
> In the logs we see many entries like this:
>
> Proposing dynamic split of work unit
> ...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at
> {"fractionConsumed":0.5}
> Rejecting split request because custom reader returned null residual
> source.
> And also occasionally this:
>
> Processing lull for 

Re: WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-06 Thread Andrew Jones
Thanks Eugene.

As you suggested, using withHintMatchesManyFiles() did result in a very
significant performance increase! Enough that it's fast enough for our
current use case.
Will track the JIRA for any further fixes.

Thanks,
Andrew

On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote:
> Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778 
> 
> On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov
>  wrote:>> For now I suggest that you augment your 
> AvroIO.parse() with
>> .withHintMatchesManyFiles() because it appears to match a very large
>> number of tiny files, and I think that's what's causing the issue.>> 
>> By default Dataflow uses 1 shard per file, and that causes 2
>> problems here:>> - Each of these shards writes a separate file to be loaded 
>> into
>>   BigQuery, so BigQuery has to load this many (tiny) files, which is
>>   not great.>> - What's worse, the ReifyResults step takes this list of 
>> written
>>   temporary files as a side input, and given Dataflow's way of
>>   materializing side inputs, it behaves pretty bad when the data for
>>   the side input is written from a very large number of shards.>> 
>> I'm not sure there's an easy fix to make your original code perform
>> well unchanged, but .withHintMatchesManyFiles() should make it
>> perform orders of magnitude better.>> 
>> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov
>>  wrote:>>> Thank you - I was wrong, it is indeed not 
>> blocked by BigQuery jobs,
>>> but by something it shouldn't be doing at all. This is definitely a
>>> bug. I'll investigate in more detail and file a JIRA so you can
>>> track the resolution.>>> 
>>> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones >> jones.com[1]> wrote: __
 Thanks for the reply. In my case the time is spent *before* the
 load job has started. See attached for a screenshot of a currently
 running job (id: 2018-03-05_04_06_20-5803269526385225708). 
 It looks like the time is spent in the ReifyResults step. Looking
 at the code and at some smaller, succeeding jobs, the BigQuery
 loads normally happen in SinglePartitionWriteTables (and presumably
 MultiPartitionsWriteTables, if required). So I'm not seeing any log
 lines with output from the BigQuery API, nor anything on the
 BigQuery side. 
 The input to ReifyResults is around ~200K elements, from
 WriteBundlesToFiles. Looking at the code, I think these are all the
 files staged and ready for loading. I'm finding it hard to work out
 exactly what ReifyResults is supposed to be doing and why it would
 take any time at all. I think it might be going through these 200K
 files and doing something with them, which if it's doing it one at
 a time and if the calls to the GCS API is expensive then it could
 be the issue? 
 On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
> BigQueryIO.write() works by: 1) having Dataflow workers write data
> to files (in parallel) 2) asking BigQuery to load those files -
> naturally, during this time Dataflow workers aren't doing
> anything, that's why the job is scaling down.> These jobs are 
> spending time waiting for BigQuery to load the
> data.> 
> As for why it's taking so long for BigQuery to load the data: You
> can try to look for BigQuery job ids in the Stackdriver logs, and
> then inspect these jobs in the BigQuery UI. If it's taking
> *really* long, it's usually a quota issue: i.e. your BigQuery jobs
> are waiting for some other BigQuery jobs to complete before even
> starting.> 
> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones  jones.com[2]> wrote:>> Hi,
>> 
>> We have a Dataflow job that loads data from GCS, does a bit of
>> transformation, then writes to a number of BigQuery tables using
>> DynamicDestinations.>> 
>> The same job runs on smaller data sets (~70 million records), but
>> this one is struggling when processing ~500 million records. Both
>> jobs are writing to the same amount of tables - the only
>> difference is the amount of records.>> 
>> Example job IDs include 2018-03-02_04_29_44-2181786949469858712
>> and 2018-03-02_08_46_28-4580218739500768796. They are using
>> BigQuery.IO to write to BigQuery, using the
>> BigQueryIO.Write.Method.FILE_LOADS method (the default for a
>> bounded job). They successfully stage all their data to GCS, but
>> then for some reason scale down the amount of workers to 1 when
>> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and
>> stay in that step for hours.>> 
>> In the logs we see many entries like this:
>> 
>> Proposing dynamic split of work unit ...-7e07;2018-03-02_04_29_44-
>> 2181786949469858712;662185752552586455 at
>> {"fractionConsumed":0.5}>> Rejecting split request because custom 

Re: WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-05 Thread Eugene Kirpichov
Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778

On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov 
wrote:

> For now I suggest that you augment your AvroIO.parse() with
> .withHintMatchesManyFiles() because it appears to match a very large number
> of tiny files, and I think that's what's causing the issue.
>
> By default Dataflow uses 1 shard per file, and that causes 2 problems here:
> - Each of these shards writes a separate file to be loaded into BigQuery,
> so BigQuery has to load this many (tiny) files, which is not great.
> - What's worse, the ReifyResults step takes this list of written temporary
> files as a side input, and given Dataflow's way of materializing side
> inputs, it behaves pretty bad when the data for the side input is written
> from a very large number of shards.
>
> I'm not sure there's an easy fix to make your original code perform well
> unchanged, but .withHintMatchesManyFiles() should make it perform orders of
> magnitude better.
>
> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov 
> wrote:
>
>> Thank you - I was wrong, it is indeed not blocked by BigQuery jobs, but
>> by something it shouldn't be doing at all. This is definitely a bug. I'll
>> investigate in more detail and file a JIRA so you can track the resolution.
>>
>> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones 
>> wrote:
>>
>>> Thanks for the reply. In my case the time is spent *before* the load
>>> job has started. See attached for a screenshot of a currently running job
>>> (id: 2018-03-05_04_06_20-5803269526385225708).
>>>
>>> It looks like the time is spent in the ReifyResults step. Looking at the
>>> code and at some smaller, succeeding jobs, the BigQuery loads normally
>>> happen in SinglePartitionWriteTables (and
>>> presumably MultiPartitionsWriteTables, if required). So I'm not seeing any
>>> log lines with output from the BigQuery API, nor anything on the BigQuery
>>> side.
>>>
>>> The input to ReifyResults is around ~200K elements,
>>> from WriteBundlesToFiles. Looking at the code, I think these are all the
>>> files staged and ready for loading. I'm finding it hard to work out exactly
>>> what ReifyResults is supposed to be doing and why it would take any time at
>>> all. I think it might be going through these 200K files and doing something
>>> with them, which if it's doing it one at a time and if the calls to the GCS
>>> API is expensive then it could be the issue?
>>>
>>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>>>
>>> BigQueryIO.write() works by: 1) having Dataflow workers write data to
>>> files (in parallel) 2) asking BigQuery to load those files - naturally,
>>> during this time Dataflow workers aren't doing anything, that's why the job
>>> is scaling down.
>>> These jobs are spending time waiting for BigQuery to load the data.
>>>
>>> As for why it's taking so long for BigQuery to load the data: You can
>>> try to look for BigQuery job ids in the Stackdriver logs, and then inspect
>>> these jobs in the BigQuery UI. If it's taking *really* long, it's usually a
>>> quota issue: i.e. your BigQuery jobs are waiting for some other BigQuery
>>> jobs to complete before even starting.
>>>
>>> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones <
>>> andrew+b...@andrew-jones.com> wrote:
>>>
>>> Hi,
>>>
>>> We have a Dataflow job that loads data from GCS, does a bit of
>>> transformation, then writes to a number of BigQuery tables using
>>> DynamicDestinations.
>>>
>>> The same job runs on smaller data sets (~70 million records), but this
>>> one is struggling when processing ~500 million records. Both jobs are
>>> writing to the same amount of tables - the only difference is the amount of
>>> records.
>>>
>>> Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and
>>> 2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to
>>> write to BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the
>>> default for a bounded job). They successfully stage all their data to GCS,
>>> but then for some reason scale down the amount of workers to 1 when
>>> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and stay in
>>> that step for hours.
>>>
>>> In the logs we see many entries like this:
>>>
>>> Proposing dynamic split of work unit
>>> ...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at
>>> {"fractionConsumed":0.5}
>>> Rejecting split request because custom reader returned null residual
>>> source.
>>> And also occasionally this:
>>>
>>> Processing lull for PT24900.038S in state process of
>>> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at
>>> java.net.SocketInputStream.socketRead0(Native Method) at
>>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at
>>> java.net.SocketInputStream.read(SocketInputStream.java:170) at
>>> java.net.SocketInputStream.read(SocketInputStream.java:141) ...
>>>
>>> The job does seem to eventually 

Re: WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-05 Thread Eugene Kirpichov
For now I suggest that you augment your AvroIO.parse() with
.withHintMatchesManyFiles() because it appears to match a very large number
of tiny files, and I think that's what's causing the issue.

By default Dataflow uses 1 shard per file, and that causes 2 problems here:
- Each of these shards writes a separate file to be loaded into BigQuery,
so BigQuery has to load this many (tiny) files, which is not great.
- What's worse, the ReifyResults step takes this list of written temporary
files as a side input, and given Dataflow's way of materializing side
inputs, it behaves pretty bad when the data for the side input is written
from a very large number of shards.

I'm not sure there's an easy fix to make your original code perform well
unchanged, but .withHintMatchesManyFiles() should make it perform orders of
magnitude better.

On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov 
wrote:

> Thank you - I was wrong, it is indeed not blocked by BigQuery jobs, but by
> something it shouldn't be doing at all. This is definitely a bug. I'll
> investigate in more detail and file a JIRA so you can track the resolution.
>
> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones 
> wrote:
>
>> Thanks for the reply. In my case the time is spent *before* the load job
>> has started. See attached for a screenshot of a currently running job (id:
>> 2018-03-05_04_06_20-5803269526385225708).
>>
>> It looks like the time is spent in the ReifyResults step. Looking at the
>> code and at some smaller, succeeding jobs, the BigQuery loads normally
>> happen in SinglePartitionWriteTables (and
>> presumably MultiPartitionsWriteTables, if required). So I'm not seeing any
>> log lines with output from the BigQuery API, nor anything on the BigQuery
>> side.
>>
>> The input to ReifyResults is around ~200K elements,
>> from WriteBundlesToFiles. Looking at the code, I think these are all the
>> files staged and ready for loading. I'm finding it hard to work out exactly
>> what ReifyResults is supposed to be doing and why it would take any time at
>> all. I think it might be going through these 200K files and doing something
>> with them, which if it's doing it one at a time and if the calls to the GCS
>> API is expensive then it could be the issue?
>>
>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>>
>> BigQueryIO.write() works by: 1) having Dataflow workers write data to
>> files (in parallel) 2) asking BigQuery to load those files - naturally,
>> during this time Dataflow workers aren't doing anything, that's why the job
>> is scaling down.
>> These jobs are spending time waiting for BigQuery to load the data.
>>
>> As for why it's taking so long for BigQuery to load the data: You can try
>> to look for BigQuery job ids in the Stackdriver logs, and then inspect
>> these jobs in the BigQuery UI. If it's taking *really* long, it's usually a
>> quota issue: i.e. your BigQuery jobs are waiting for some other BigQuery
>> jobs to complete before even starting.
>>
>> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones 
>> wrote:
>>
>> Hi,
>>
>> We have a Dataflow job that loads data from GCS, does a bit of
>> transformation, then writes to a number of BigQuery tables using
>> DynamicDestinations.
>>
>> The same job runs on smaller data sets (~70 million records), but this
>> one is struggling when processing ~500 million records. Both jobs are
>> writing to the same amount of tables - the only difference is the amount of
>> records.
>>
>> Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and
>> 2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to
>> write to BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the
>> default for a bounded job). They successfully stage all their data to GCS,
>> but then for some reason scale down the amount of workers to 1 when
>> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and stay in
>> that step for hours.
>>
>> In the logs we see many entries like this:
>>
>> Proposing dynamic split of work unit
>> ...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at
>> {"fractionConsumed":0.5}
>> Rejecting split request because custom reader returned null residual
>> source.
>> And also occasionally this:
>>
>> Processing lull for PT24900.038S in state process of
>> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at
>> java.net.SocketInputStream.socketRead0(Native Method) at
>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at
>> java.net.SocketInputStream.read(SocketInputStream.java:170) at
>> java.net.SocketInputStream.read(SocketInputStream.java:141) ...
>>
>> The job does seem to eventually progress, but after many hours. It then
>> fails later with this error, which may or may not be related (just starting
>> to look in to):
>>
>> (94794e1a2c96f380): java.lang.RuntimeException:
>> org.apache.beam.sdk.util.UserCodeException: java.io.IOException: 

Re: WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-05 Thread Eugene Kirpichov
Thank you - I was wrong, it is indeed not blocked by BigQuery jobs, but by
something it shouldn't be doing at all. This is definitely a bug. I'll
investigate in more detail and file a JIRA so you can track the resolution.

On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones 
wrote:

> Thanks for the reply. In my case the time is spent *before* the load job
> has started. See attached for a screenshot of a currently running job (id:
> 2018-03-05_04_06_20-5803269526385225708).
>
> It looks like the time is spent in the ReifyResults step. Looking at the
> code and at some smaller, succeeding jobs, the BigQuery loads normally
> happen in SinglePartitionWriteTables (and
> presumably MultiPartitionsWriteTables, if required). So I'm not seeing any
> log lines with output from the BigQuery API, nor anything on the BigQuery
> side.
>
> The input to ReifyResults is around ~200K elements,
> from WriteBundlesToFiles. Looking at the code, I think these are all the
> files staged and ready for loading. I'm finding it hard to work out exactly
> what ReifyResults is supposed to be doing and why it would take any time at
> all. I think it might be going through these 200K files and doing something
> with them, which if it's doing it one at a time and if the calls to the GCS
> API is expensive then it could be the issue?
>
> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>
> BigQueryIO.write() works by: 1) having Dataflow workers write data to
> files (in parallel) 2) asking BigQuery to load those files - naturally,
> during this time Dataflow workers aren't doing anything, that's why the job
> is scaling down.
> These jobs are spending time waiting for BigQuery to load the data.
>
> As for why it's taking so long for BigQuery to load the data: You can try
> to look for BigQuery job ids in the Stackdriver logs, and then inspect
> these jobs in the BigQuery UI. If it's taking *really* long, it's usually a
> quota issue: i.e. your BigQuery jobs are waiting for some other BigQuery
> jobs to complete before even starting.
>
> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones 
> wrote:
>
> Hi,
>
> We have a Dataflow job that loads data from GCS, does a bit of
> transformation, then writes to a number of BigQuery tables using
> DynamicDestinations.
>
> The same job runs on smaller data sets (~70 million records), but this one
> is struggling when processing ~500 million records. Both jobs are writing
> to the same amount of tables - the only difference is the amount of records.
>
> Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and
> 2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to
> write to BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the
> default for a bounded job). They successfully stage all their data to GCS,
> but then for some reason scale down the amount of workers to 1 when
> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and stay in
> that step for hours.
>
> In the logs we see many entries like this:
>
> Proposing dynamic split of work unit
> ...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at
> {"fractionConsumed":0.5}
> Rejecting split request because custom reader returned null residual
> source.
> And also occasionally this:
>
> Processing lull for PT24900.038S in state process of
> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at
> java.net.SocketInputStream.socketRead0(Native Method) at
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at
> java.net.SocketInputStream.read(SocketInputStream.java:170) at
> java.net.SocketInputStream.read(SocketInputStream.java:141) ...
>
> The job does seem to eventually progress, but after many hours. It then
> fails later with this error, which may or may not be related (just starting
> to look in to):
>
> (94794e1a2c96f380): java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to
> patch table description: {datasetId=..., projectId=...,
> tableId=9c20908cc6e549b4a1e116af54bb8128_011249028ddcc5204885bff04ce2a725_1_0},
> aborting after 9 retries.
>
> We're not sure how to proceed, so any pointers would be appreciated.
>
> Thanks,
> Andrew
>
>
>


Re: WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-04 Thread Eugene Kirpichov
BigQueryIO.write() works by: 1) having Dataflow workers write data to files
(in parallel) 2) asking BigQuery to load those files - naturally, during
this time Dataflow workers aren't doing anything, that's why the job is
scaling down.
These jobs are spending time waiting for BigQuery to load the data.

As for why it's taking so long for BigQuery to load the data: You can try
to look for BigQuery job ids in the Stackdriver logs, and then inspect
these jobs in the BigQuery UI. If it's taking *really* long, it's usually a
quota issue: i.e. your BigQuery jobs are waiting for some other BigQuery
jobs to complete before even starting.

On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones 
wrote:

> Hi,
>
> We have a Dataflow job that loads data from GCS, does a bit of
> transformation, then writes to a number of BigQuery tables using
> DynamicDestinations.
>
> The same job runs on smaller data sets (~70 million records), but this one
> is struggling when processing ~500 million records. Both jobs are writing
> to the same amount of tables - the only difference is the amount of records.
>
> Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and
> 2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to
> write to BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the
> default for a bounded job). They successfully stage all their data to GCS,
> but then for some reason scale down the amount of workers to 1 when
> processing the step WriteTOBigQuery/BatchLoads/ReifyResults and stay in
> that step for hours.
>
> In the logs we see many entries like this:
>
> Proposing dynamic split of work unit
> ...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at
> {"fractionConsumed":0.5}
> Rejecting split request because custom reader returned null residual
> source.
> And also occasionally this:
>
> Processing lull for PT24900.038S in state process of
> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at
> java.net.SocketInputStream.socketRead0(Native Method) at
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at
> java.net.SocketInputStream.read(SocketInputStream.java:170) at
> java.net.SocketInputStream.read(SocketInputStream.java:141) ...
>
> The job does seem to eventually progress, but after many hours. It then
> fails later with this error, which may or may not be related (just starting
> to look in to):
>
> (94794e1a2c96f380): java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to
> patch table description: {datasetId=..., projectId=...,
> tableId=9c20908cc6e549b4a1e116af54bb8128_011249028ddcc5204885bff04ce2a725_1_0},
> aborting after 9 retries.
>
> We're not sure how to proceed, so any pointers would be appreciated.
>
> Thanks,
> Andrew
>


WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-03 Thread Andrew Jones
Hi,

We have a Dataflow job that loads data from GCS, does a bit of transformation, 
then writes to a number of BigQuery tables using DynamicDestinations.

The same job runs on smaller data sets (~70 million records), but this one is 
struggling when processing ~500 million records. Both jobs are writing to the 
same amount of tables - the only difference is the amount of records.

Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and 
2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to write to 
BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the default for 
a bounded job). They successfully stage all their data to GCS, but then for 
some reason scale down the amount of workers to 1 when processing the step 
WriteTOBigQuery/BatchLoads/ReifyResults and stay in that step for hours.

In the logs we see many entries like this:

Proposing dynamic split of work unit 
...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at 
{"fractionConsumed":0.5}
Rejecting split request because custom reader returned null residual source.
And also occasionally this:

Processing lull for PT24900.038S in state process of 
WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at 
java.net.SocketInputStream.socketRead0(Native Method) at 
java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at 
java.net.SocketInputStream.read(SocketInputStream.java:170) at 
java.net.SocketInputStream.read(SocketInputStream.java:141) ...

The job does seem to eventually progress, but after many hours. It then fails 
later with this error, which may or may not be related (just starting to look 
in to):

(94794e1a2c96f380): java.lang.RuntimeException: 
org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to 
patch table description: {datasetId=..., projectId=..., 
tableId=9c20908cc6e549b4a1e116af54bb8128_011249028ddcc5204885bff04ce2a725_1_0},
 aborting after 9 retries.

We're not sure how to proceed, so any pointers would be appreciated.

Thanks,
Andrew