+ d...@beam.apache.org

Just a quick email to let you know that I'm starting developing this.

On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hi Carlos,
>
> Thank you for expressing interest in taking this on! Let me give you a few
> pointers to start, and I'll be happy to help everywhere along the way.
>
> Basically we want BigQueryIO.write() to return something (e.g. a
> PCollection) that can be used as input to Wait.on().
> Currently it returns a WriteResult, which only contains a
> PCollection<TableRow> of failed inserts - that one can not be used
> directly, instead we should add another component to WriteResult that
> represents the result of successfully writing some data.
>
> Given that BQIO supports dynamic destination writes, I think it makes
> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
> we could sequence different destinations independently (currently Wait.on()
> does not provide such a feature, but it could); and it will require
> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
> might be - it is something that represents the result of successfully
> writing a window of data. I think it can even be Void, or "?" (wildcard
> type) for now, until we figure out something better.
>
> Implementing this would require roughly the following work:
> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
> - Modify the BatchLoads transform to provide it on both codepaths:
> expandTriggered() and expandUntriggered()
> ...- expandTriggered() itself writes via 2 codepaths: single-partition and
> multi-partition. Both need to be handled - we need to get a
> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
> PCollections together to get the final result. The single-partition
> codepath (writeSinglePartition) under the hood already uses WriteTables
> that returns a KV<DestinationT, ...> so it's directly usable. The
> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
> codepath drops DestinationT along the way and will need to be refactored a
> bit to keep it until the end.
> ...- expandUntriggered() should be treated the same way.
> - Modify the StreamingWriteTables transform to provide it
> ...- Here also, the challenge is to propagate the DestinationT type all
> the way until the end of StreamingWriteTables - it will need to be
> refactored. After such a refactoring, returning a KV<DestinationT, ...>
> should be easy.
>
> Another challenge with all of this is backwards compatibility in terms of
> API and pipeline update.
> Pipeline update is much less of a concern for the BatchLoads codepath,
> because it's typically used in batch-mode pipelines that don't get updated.
> I would recommend to start with this, perhaps even with only the
> untriggered codepath (it is much more commonly used) - that will pave the
> way for future work.
>
> Hope this helps, please ask more if something is unclear!
>
> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Hey Eugene!!
>>
>> I’d gladly take a stab on it although I’m not sure how much available
>> time I might have to put into but... yeah, let’s try it.
>>
>> Where should I begin? Is there a Jira issue or shall I file one?
>>
>> Thanks!
>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Yes, you're both right - BigQueryIO.write() is currently not implemented
>>> in a way that it can be used with Wait.on(). It would certainly be a
>>> welcome contribution to change this - many people expressed interest in
>>> specifically waiting for BigQuery writes. Is any of you interested in
>>> helping out?
>>>
>>> Thanks.
>>>
>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>> understanding. I'd also be interested in getting batch load result's
>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>> simon.kitch...@unbelievable-machine.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>> Pubsub message to trigger further processing.
>>>>>
>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>
>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>
>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>
>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>> Advice appreciated.
>>>>>
>>>>> Here's (most of) the relevant test code:
>>>>>         Pipeline p = Pipeline.create(options);
>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>
>>>>>         TableFieldSchema f1 = new
>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>         TableSchema s2 = new
>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>
>>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>>> BigQueryIO.<String>write() //
>>>>>                 .to(options.getTableSpec()) //
>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>> //
>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>                 .withSchema(s2)
>>>>>
>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>> //
>>>>>
>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>
>>>>>
>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>> OnCompletion()));
>>>>>
>>>>> where
>>>>> + format-function "SlowFormatter" prints out each line and has a small
>>>>> sleep for testing purposes, and
>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>
>>>>> In production code, OnCompletion would be fed some collection derived
>>>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>>>> message" rather than print..
>>>>>
>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>
>>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, 
>>>>> but
>>>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>>>> is "complete for window" only after all data has been uploaded, which is
>>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>>
>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>> failedInserts collection, ie data which is not connected to the
>>>>> batch-load-job that is triggered:
>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>     PCollection<TableRow> empty =
>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>   }
>>>>>
>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>>> once a job is submitted the code repeatedly polls the job status until it
>>>>> reaches DONE or FAILED. However that information does not appear to be
>>>>> exposed anywhere (unlike streaming which effectively exposes
>>>>> completion-state via the failedInserts stream).
>>>>>
>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>
>>>>> Thanks,
>>>>> Simon
>>>>>
>>>>>

Reply via email to