Hi Carlos, Any updates / roadblocks you hit?
On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <kirpic...@google.com> wrote: > Awesome!! Thanks for the heads up, very exciting, this is going to make a > lot of people happy :) > > On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <car...@mrcalonso.com> wrote: > >> + d...@beam.apache.org >> >> Just a quick email to let you know that I'm starting developing this. >> >> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> Hi Carlos, >>> >>> Thank you for expressing interest in taking this on! Let me give you a >>> few pointers to start, and I'll be happy to help everywhere along the way. >>> >>> Basically we want BigQueryIO.write() to return something (e.g. a >>> PCollection) that can be used as input to Wait.on(). >>> Currently it returns a WriteResult, which only contains a >>> PCollection<TableRow> of failed inserts - that one can not be used >>> directly, instead we should add another component to WriteResult that >>> represents the result of successfully writing some data. >>> >>> Given that BQIO supports dynamic destination writes, I think it makes >>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory >>> we could sequence different destinations independently (currently Wait.on() >>> does not provide such a feature, but it could); and it will require >>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???" >>> might be - it is something that represents the result of successfully >>> writing a window of data. I think it can even be Void, or "?" (wildcard >>> type) for now, until we figure out something better. >>> >>> Implementing this would require roughly the following work: >>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult >>> - Modify the BatchLoads transform to provide it on both codepaths: >>> expandTriggered() and expandUntriggered() >>> ...- expandTriggered() itself writes via 2 codepaths: single-partition >>> and multi-partition. Both need to be handled - we need to get a >>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two >>> PCollections together to get the final result. The single-partition >>> codepath (writeSinglePartition) under the hood already uses WriteTables >>> that returns a KV<DestinationT, ...> so it's directly usable. The >>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this >>> codepath drops DestinationT along the way and will need to be refactored a >>> bit to keep it until the end. >>> ...- expandUntriggered() should be treated the same way. >>> - Modify the StreamingWriteTables transform to provide it >>> ...- Here also, the challenge is to propagate the DestinationT type all >>> the way until the end of StreamingWriteTables - it will need to be >>> refactored. After such a refactoring, returning a KV<DestinationT, ...> >>> should be easy. >>> >>> Another challenge with all of this is backwards compatibility in terms >>> of API and pipeline update. >>> Pipeline update is much less of a concern for the BatchLoads codepath, >>> because it's typically used in batch-mode pipelines that don't get updated. >>> I would recommend to start with this, perhaps even with only the >>> untriggered codepath (it is much more commonly used) - that will pave the >>> way for future work. >>> >>> Hope this helps, please ask more if something is unclear! >>> >>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com> >>> wrote: >>> >>>> Hey Eugene!! >>>> >>>> I’d gladly take a stab on it although I’m not sure how much available >>>> time I might have to put into but... yeah, let’s try it. >>>> >>>> Where should I begin? Is there a Jira issue or shall I file one? >>>> >>>> Thanks! >>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Yes, you're both right - BigQueryIO.write() is currently not >>>>> implemented in a way that it can be used with Wait.on(). It would >>>>> certainly >>>>> be a welcome contribution to change this - many people expressed interest >>>>> in specifically waiting for BigQuery writes. Is any of you interested in >>>>> helping out? >>>>> >>>>> Thanks. >>>>> >>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com> >>>>> wrote: >>>>> >>>>>> Hi Simon, I think your explanation was very accurate, at least to my >>>>>> understanding. I'd also be interested in getting batch load result's >>>>>> feedback on the pipeline... hopefully someone may suggest something, >>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :) >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching < >>>>>> simon.kitch...@unbelievable-machine.com> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I need to write some data to BigQuery (batch-mode) and then send a >>>>>>> Pubsub message to trigger further processing. >>>>>>> >>>>>>> I found this thread titled "Callbacks/other functions run after a >>>>>>> PDone/output transform" on the user-list which was very relevant: >>>>>>> >>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E >>>>>>> >>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)! >>>>>>> >>>>>>> Unfortunately, it appears that the Wait.on transform does not work >>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to >>>>>>> work. >>>>>>> Advice appreciated. >>>>>>> >>>>>>> Here's (most of) the relevant test code: >>>>>>> Pipeline p = Pipeline.create(options); >>>>>>> PCollection<String> lines = p.apply("Read Input", >>>>>>> Create.of("line1", "line2", "line3", "line4")); >>>>>>> >>>>>>> TableFieldSchema f1 = new >>>>>>> TableFieldSchema().setName("value").setType("string"); >>>>>>> TableSchema s2 = new >>>>>>> TableSchema().setFields(Collections.singletonList(f1)); >>>>>>> >>>>>>> WriteResult writeResult = lines.apply("Write and load data", >>>>>>> BigQueryIO.<String>write() // >>>>>>> .to(options.getTableSpec()) // >>>>>>> .withFormatFunction(new SlowFormatter()) // >>>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) // >>>>>>> // >>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) // >>>>>>> .withSchema(s2) >>>>>>> >>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>>> // >>>>>>> >>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>>>>> >>>>>>> >>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new >>>>>>> OnCompletion())); >>>>>>> >>>>>>> where >>>>>>> + format-function "SlowFormatter" prints out each line and has a >>>>>>> small sleep for testing purposes, and >>>>>>> + DoFn OnCompletion just prints out the contents of each line >>>>>>> >>>>>>> In production code, OnCompletion would be fed some collection >>>>>>> derived from lines, eg min/max record id, and the operation would be >>>>>>> "send >>>>>>> pubsub message" rather than print.. >>>>>>> >>>>>>> My expectation is that the "SlowFormatter" would run for each line, >>>>>>> then the data would be uploaded, then OnCompletion would print each >>>>>>> line. >>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for >>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place. >>>>>>> >>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that >>>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, >>>>>>> but >>>>>>> believe that it can be used as a "signal" for the Wait.on - ie the >>>>>>> output >>>>>>> is "complete for window" only after all data has been uploaded, which is >>>>>>> what I need. And that does seem to work for STREAMING_LOADS. >>>>>>> >>>>>>> I suspect the reason that this does not work for FILE_LOADS is that >>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an >>>>>>> "empty" >>>>>>> failedInserts collection, ie data which is not connected to the >>>>>>> batch-load-job that is triggered: >>>>>>> private WriteResult writeResult(Pipeline p) { >>>>>>> PCollection<TableRow> empty = >>>>>>> p.apply("CreateEmptyFailedInserts", >>>>>>> Create.empty(TypeDescriptor.of(TableRow.class))); >>>>>>> return WriteResult.in(p, new TupleTag<>("failedInserts"), empty); >>>>>>> } >>>>>>> >>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; >>>>>>> once a job is submitted the code repeatedly polls the job status until >>>>>>> it >>>>>>> reaches DONE or FAILED. However that information does not appear to be >>>>>>> exposed anywhere (unlike streaming which effectively exposes >>>>>>> completion-state via the failedInserts stream). >>>>>>> >>>>>>> If I have misunderstood something, corrections welcome! If not, >>>>>>> suggestions for workarounds or alternate solutions are also welcome :-) >>>>>>> >>>>>>> Thanks, >>>>>>> Simon >>>>>>> >>>>>>>