Hi, Yes, you're both right - BigQueryIO.write() is currently not implemented in a way that it can be used with Wait.on(). It would certainly be a welcome contribution to change this - many people expressed interest in specifically waiting for BigQuery writes. Is any of you interested in helping out?
Thanks. On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <[email protected]> wrote: > Hi Simon, I think your explanation was very accurate, at least to my > understanding. I'd also be interested in getting batch load result's > feedback on the pipeline... hopefully someone may suggest something, > otherwise we could propose submitting a Jira, or even better, a PR!! :) > > Thanks! > > On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching < > [email protected]> wrote: > >> Hi All, >> >> I need to write some data to BigQuery (batch-mode) and then send a Pubsub >> message to trigger further processing. >> >> I found this thread titled "Callbacks/other functions run after a >> PDone/output transform" on the user-list which was very relevant: >> >> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E >> >> Thanks to the author of the Wait transform (Beam 2.4.0)! >> >> Unfortunately, it appears that the Wait.on transform does not work with >> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice >> appreciated. >> >> Here's (most of) the relevant test code: >> Pipeline p = Pipeline.create(options); >> PCollection<String> lines = p.apply("Read Input", >> Create.of("line1", "line2", "line3", "line4")); >> >> TableFieldSchema f1 = new >> TableFieldSchema().setName("value").setType("string"); >> TableSchema s2 = new >> TableSchema().setFields(Collections.singletonList(f1)); >> >> WriteResult writeResult = lines.apply("Write and load data", >> BigQueryIO.<String>write() // >> .to(options.getTableSpec()) // >> .withFormatFunction(new SlowFormatter()) // >> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) // >> // .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) >> // >> .withSchema(s2) >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >> // >> >> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >> >> >> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new >> OnCompletion())); >> >> where >> + format-function "SlowFormatter" prints out each line and has a small >> sleep for testing purposes, and >> + DoFn OnCompletion just prints out the contents of each line >> >> In production code, OnCompletion would be fed some collection derived >> from lines, eg min/max record id, and the operation would be "send pubsub >> message" rather than print.. >> >> My expectation is that the "SlowFormatter" would run for each line, then >> the data would be uploaded, then OnCompletion would print each line. And >> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS, >> LinePrinter runs before the upload takes place. >> >> I use WriteResult.getFailedInserts as that is the only "output" that >> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but >> believe that it can be used as a "signal" for the Wait.on - ie the output >> is "complete for window" only after all data has been uploaded, which is >> what I need. And that does seem to work for STREAMING_LOADS. >> >> I suspect the reason that this does not work for FILE_LOADS is that >> method BatchLoads.writeResult returns a WriteResult that wraps an "empty" >> failedInserts collection, ie data which is not connected to the >> batch-load-job that is triggered: >> private WriteResult writeResult(Pipeline p) { >> PCollection<TableRow> empty = >> p.apply("CreateEmptyFailedInserts", >> Create.empty(TypeDescriptor.of(TableRow.class))); >> return WriteResult.in(p, new TupleTag<>("failedInserts"), empty); >> } >> >> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; once >> a job is submitted the code repeatedly polls the job status until it >> reaches DONE or FAILED. However that information does not appear to be >> exposed anywhere (unlike streaming which effectively exposes >> completion-state via the failedInserts stream). >> >> If I have misunderstood something, corrections welcome! If not, >> suggestions for workarounds or alternate solutions are also welcome :-) >> >> Thanks, >> Simon >> >>
