Hey Eugene!! I’d gladly take a stab on it although I’m not sure how much available time I might have to put into but... yeah, let’s try it.
Where should I begin? Is there a Jira issue or shall I file one? Thanks! On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <[email protected]> wrote: > Hi, > > Yes, you're both right - BigQueryIO.write() is currently not implemented > in a way that it can be used with Wait.on(). It would certainly be a > welcome contribution to change this - many people expressed interest in > specifically waiting for BigQuery writes. Is any of you interested in > helping out? > > Thanks. > > On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <[email protected]> > wrote: > >> Hi Simon, I think your explanation was very accurate, at least to my >> understanding. I'd also be interested in getting batch load result's >> feedback on the pipeline... hopefully someone may suggest something, >> otherwise we could propose submitting a Jira, or even better, a PR!! :) >> >> Thanks! >> >> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching < >> [email protected]> wrote: >> >>> Hi All, >>> >>> I need to write some data to BigQuery (batch-mode) and then send a >>> Pubsub message to trigger further processing. >>> >>> I found this thread titled "Callbacks/other functions run after a >>> PDone/output transform" on the user-list which was very relevant: >>> >>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E >>> >>> Thanks to the author of the Wait transform (Beam 2.4.0)! >>> >>> Unfortunately, it appears that the Wait.on transform does not work with >>> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice >>> appreciated. >>> >>> Here's (most of) the relevant test code: >>> Pipeline p = Pipeline.create(options); >>> PCollection<String> lines = p.apply("Read Input", >>> Create.of("line1", "line2", "line3", "line4")); >>> >>> TableFieldSchema f1 = new >>> TableFieldSchema().setName("value").setType("string"); >>> TableSchema s2 = new >>> TableSchema().setFields(Collections.singletonList(f1)); >>> >>> WriteResult writeResult = lines.apply("Write and load data", >>> BigQueryIO.<String>write() // >>> .to(options.getTableSpec()) // >>> .withFormatFunction(new SlowFormatter()) // >>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) // >>> // .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) >>> // >>> .withSchema(s2) >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>> // >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>> >>> >>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new >>> OnCompletion())); >>> >>> where >>> + format-function "SlowFormatter" prints out each line and has a small >>> sleep for testing purposes, and >>> + DoFn OnCompletion just prints out the contents of each line >>> >>> In production code, OnCompletion would be fed some collection derived >>> from lines, eg min/max record id, and the operation would be "send pubsub >>> message" rather than print.. >>> >>> My expectation is that the "SlowFormatter" would run for each line, then >>> the data would be uploaded, then OnCompletion would print each line. And >>> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS, >>> LinePrinter runs before the upload takes place. >>> >>> I use WriteResult.getFailedInserts as that is the only "output" that >>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but >>> believe that it can be used as a "signal" for the Wait.on - ie the output >>> is "complete for window" only after all data has been uploaded, which is >>> what I need. And that does seem to work for STREAMING_LOADS. >>> >>> I suspect the reason that this does not work for FILE_LOADS is that >>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty" >>> failedInserts collection, ie data which is not connected to the >>> batch-load-job that is triggered: >>> private WriteResult writeResult(Pipeline p) { >>> PCollection<TableRow> empty = >>> p.apply("CreateEmptyFailedInserts", >>> Create.empty(TypeDescriptor.of(TableRow.class))); >>> return WriteResult.in(p, new TupleTag<>("failedInserts"), empty); >>> } >>> >>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; >>> once a job is submitted the code repeatedly polls the job status until it >>> reaches DONE or FAILED. However that information does not appear to be >>> exposed anywhere (unlike streaming which effectively exposes >>> completion-state via the failedInserts stream). >>> >>> If I have misunderstood something, corrections welcome! If not, >>> suggestions for workarounds or alternate solutions are also welcome :-) >>> >>> Thanks, >>> Simon >>> >>>
