Hi,

Yes, you're both right - BigQueryIO.write() is currently not implemented in
a way that it can be used with Wait.on(). It would certainly be a welcome
contribution to change this - many people expressed interest in
specifically waiting for BigQuery writes. Is any of you interested in
helping out?

Thanks.

On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com> wrote:

> Hi Simon, I think your explanation was very accurate, at least to my
> understanding. I'd also be interested in getting batch load result's
> feedback on the pipeline... hopefully someone may suggest something,
> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>
> Thanks!
>
> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
> simon.kitch...@unbelievable-machine.com> wrote:
>
>> Hi All,
>>
>> I need to write some data to BigQuery (batch-mode) and then send a Pubsub
>> message to trigger further processing.
>>
>> I found this thread titled "Callbacks/other functions run after a
>> PDone/output transform" on the user-list which was very relevant:
>>
>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>
>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>
>> Unfortunately, it appears that the Wait.on transform does not work with
>> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice
>> appreciated.
>>
>> Here's (most of) the relevant test code:
>>         Pipeline p = Pipeline.create(options);
>>         PCollection<String> lines = p.apply("Read Input",
>> Create.of("line1", "line2", "line3", "line4"));
>>
>>         TableFieldSchema f1 = new
>> TableFieldSchema().setName("value").setType("string");
>>         TableSchema s2 = new
>> TableSchema().setFields(Collections.singletonList(f1));
>>
>>         WriteResult writeResult = lines.apply("Write and load data",
>> BigQueryIO.<String>write() //
>>                 .to(options.getTableSpec()) //
>>                 .withFormatFunction(new SlowFormatter()) //
>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>> //                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
>> //
>>                 .withSchema(s2)
>>
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>> //
>>
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>
>>
>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>> OnCompletion()));
>>
>> where
>> + format-function "SlowFormatter" prints out each line and has a small
>> sleep for testing purposes, and
>> + DoFn OnCompletion just prints out the contents of each line
>>
>> In production code, OnCompletion would be fed some collection derived
>> from lines, eg min/max record id, and the operation would be "send pubsub
>> message" rather than print..
>>
>> My expectation is that the "SlowFormatter" would run for each line, then
>> the data would be uploaded, then OnCompletion would print each line. And
>> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS,
>> LinePrinter runs before the upload takes place.
>>
>> I use WriteResult.getFailedInserts as that is the only "output" that
>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>> believe that it can be used as a "signal" for the Wait.on - ie the output
>> is "complete for window" only after all data has been uploaded, which is
>> what I need. And that does seem to work for STREAMING_LOADS.
>>
>> I suspect the reason that this does not work for FILE_LOADS is that
>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>> failedInserts collection, ie data which is not connected to the
>> batch-load-job that is triggered:
>>   private WriteResult writeResult(Pipeline p) {
>>     PCollection<TableRow> empty =
>>         p.apply("CreateEmptyFailedInserts",
>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>   }
>>
>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; once
>> a job is submitted the code repeatedly polls the job status until it
>> reaches DONE or FAILED. However that information does not appear to be
>> exposed anywhere (unlike streaming which effectively exposes
>> completion-state via the failedInserts stream).
>>
>> If I have misunderstood something, corrections welcome! If not,
>> suggestions for workarounds or alternate solutions are also welcome :-)
>>
>> Thanks,
>> Simon
>>
>>

Reply via email to