+ d...@beam.apache.org Just a quick email to let you know that I'm starting developing this.
On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <kirpic...@google.com> wrote: > Hi Carlos, > > Thank you for expressing interest in taking this on! Let me give you a few > pointers to start, and I'll be happy to help everywhere along the way. > > Basically we want BigQueryIO.write() to return something (e.g. a > PCollection) that can be used as input to Wait.on(). > Currently it returns a WriteResult, which only contains a > PCollection<TableRow> of failed inserts - that one can not be used > directly, instead we should add another component to WriteResult that > represents the result of successfully writing some data. > > Given that BQIO supports dynamic destination writes, I think it makes > sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory > we could sequence different destinations independently (currently Wait.on() > does not provide such a feature, but it could); and it will require > changing WriteResult to be WriteResult<DestinationT>. As for what the "???" > might be - it is something that represents the result of successfully > writing a window of data. I think it can even be Void, or "?" (wildcard > type) for now, until we figure out something better. > > Implementing this would require roughly the following work: > - Add this PCollection<KV<DestinationT, ?>> to WriteResult > - Modify the BatchLoads transform to provide it on both codepaths: > expandTriggered() and expandUntriggered() > ...- expandTriggered() itself writes via 2 codepaths: single-partition and > multi-partition. Both need to be handled - we need to get a > PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two > PCollections together to get the final result. The single-partition > codepath (writeSinglePartition) under the hood already uses WriteTables > that returns a KV<DestinationT, ...> so it's directly usable. The > multi-partition codepath ends in WriteRenameTriggered - unfortunately, this > codepath drops DestinationT along the way and will need to be refactored a > bit to keep it until the end. > ...- expandUntriggered() should be treated the same way. > - Modify the StreamingWriteTables transform to provide it > ...- Here also, the challenge is to propagate the DestinationT type all > the way until the end of StreamingWriteTables - it will need to be > refactored. After such a refactoring, returning a KV<DestinationT, ...> > should be easy. > > Another challenge with all of this is backwards compatibility in terms of > API and pipeline update. > Pipeline update is much less of a concern for the BatchLoads codepath, > because it's typically used in batch-mode pipelines that don't get updated. > I would recommend to start with this, perhaps even with only the > untriggered codepath (it is much more commonly used) - that will pave the > way for future work. > > Hope this helps, please ask more if something is unclear! > > On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com> > wrote: > >> Hey Eugene!! >> >> I’d gladly take a stab on it although I’m not sure how much available >> time I might have to put into but... yeah, let’s try it. >> >> Where should I begin? Is there a Jira issue or shall I file one? >> >> Thanks! >> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> Hi, >>> >>> Yes, you're both right - BigQueryIO.write() is currently not implemented >>> in a way that it can be used with Wait.on(). It would certainly be a >>> welcome contribution to change this - many people expressed interest in >>> specifically waiting for BigQuery writes. Is any of you interested in >>> helping out? >>> >>> Thanks. >>> >>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com> >>> wrote: >>> >>>> Hi Simon, I think your explanation was very accurate, at least to my >>>> understanding. I'd also be interested in getting batch load result's >>>> feedback on the pipeline... hopefully someone may suggest something, >>>> otherwise we could propose submitting a Jira, or even better, a PR!! :) >>>> >>>> Thanks! >>>> >>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching < >>>> simon.kitch...@unbelievable-machine.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I need to write some data to BigQuery (batch-mode) and then send a >>>>> Pubsub message to trigger further processing. >>>>> >>>>> I found this thread titled "Callbacks/other functions run after a >>>>> PDone/output transform" on the user-list which was very relevant: >>>>> >>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E >>>>> >>>>> Thanks to the author of the Wait transform (Beam 2.4.0)! >>>>> >>>>> Unfortunately, it appears that the Wait.on transform does not work >>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. >>>>> Advice appreciated. >>>>> >>>>> Here's (most of) the relevant test code: >>>>> Pipeline p = Pipeline.create(options); >>>>> PCollection<String> lines = p.apply("Read Input", >>>>> Create.of("line1", "line2", "line3", "line4")); >>>>> >>>>> TableFieldSchema f1 = new >>>>> TableFieldSchema().setName("value").setType("string"); >>>>> TableSchema s2 = new >>>>> TableSchema().setFields(Collections.singletonList(f1)); >>>>> >>>>> WriteResult writeResult = lines.apply("Write and load data", >>>>> BigQueryIO.<String>write() // >>>>> .to(options.getTableSpec()) // >>>>> .withFormatFunction(new SlowFormatter()) // >>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) // >>>>> // >>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) // >>>>> .withSchema(s2) >>>>> >>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>> // >>>>> >>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>>> >>>>> >>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new >>>>> OnCompletion())); >>>>> >>>>> where >>>>> + format-function "SlowFormatter" prints out each line and has a small >>>>> sleep for testing purposes, and >>>>> + DoFn OnCompletion just prints out the contents of each line >>>>> >>>>> In production code, OnCompletion would be fed some collection derived >>>>> from lines, eg min/max record id, and the operation would be "send pubsub >>>>> message" rather than print.. >>>>> >>>>> My expectation is that the "SlowFormatter" would run for each line, >>>>> then the data would be uploaded, then OnCompletion would print each line. >>>>> And indeed that happens when STREAMING_INSERTS is used. However for >>>>> FILE_LOADS, LinePrinter runs before the upload takes place. >>>>> >>>>> I use WriteResult.getFailedInserts as that is the only "output" that >>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, >>>>> but >>>>> believe that it can be used as a "signal" for the Wait.on - ie the output >>>>> is "complete for window" only after all data has been uploaded, which is >>>>> what I need. And that does seem to work for STREAMING_LOADS. >>>>> >>>>> I suspect the reason that this does not work for FILE_LOADS is that >>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty" >>>>> failedInserts collection, ie data which is not connected to the >>>>> batch-load-job that is triggered: >>>>> private WriteResult writeResult(Pipeline p) { >>>>> PCollection<TableRow> empty = >>>>> p.apply("CreateEmptyFailedInserts", >>>>> Create.empty(TypeDescriptor.of(TableRow.class))); >>>>> return WriteResult.in(p, new TupleTag<>("failedInserts"), empty); >>>>> } >>>>> >>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; >>>>> once a job is submitted the code repeatedly polls the job status until it >>>>> reaches DONE or FAILED. However that information does not appear to be >>>>> exposed anywhere (unlike streaming which effectively exposes >>>>> completion-state via the failedInserts stream). >>>>> >>>>> If I have misunderstood something, corrections welcome! If not, >>>>> suggestions for workarounds or alternate solutions are also welcome :-) >>>>> >>>>> Thanks, >>>>> Simon >>>>> >>>>>