Hey Eugene!!

I’d gladly take a stab on it although I’m not sure how much available time
I might have to put into but... yeah, let’s try it.

Where should I begin? Is there a Jira issue or shall I file one?

Thanks!
On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <[email protected]> wrote:

> Hi,
>
> Yes, you're both right - BigQueryIO.write() is currently not implemented
> in a way that it can be used with Wait.on(). It would certainly be a
> welcome contribution to change this - many people expressed interest in
> specifically waiting for BigQuery writes. Is any of you interested in
> helping out?
>
> Thanks.
>
> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <[email protected]>
> wrote:
>
>> Hi Simon, I think your explanation was very accurate, at least to my
>> understanding. I'd also be interested in getting batch load result's
>> feedback on the pipeline... hopefully someone may suggest something,
>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>
>> Thanks!
>>
>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>> [email protected]> wrote:
>>
>>> Hi All,
>>>
>>> I need to write some data to BigQuery (batch-mode) and then send a
>>> Pubsub message to trigger further processing.
>>>
>>> I found this thread titled "Callbacks/other functions run after a
>>> PDone/output transform" on the user-list which was very relevant:
>>>
>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>
>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>
>>> Unfortunately, it appears that the Wait.on transform does not work with
>>> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice
>>> appreciated.
>>>
>>> Here's (most of) the relevant test code:
>>>         Pipeline p = Pipeline.create(options);
>>>         PCollection<String> lines = p.apply("Read Input",
>>> Create.of("line1", "line2", "line3", "line4"));
>>>
>>>         TableFieldSchema f1 = new
>>> TableFieldSchema().setName("value").setType("string");
>>>         TableSchema s2 = new
>>> TableSchema().setFields(Collections.singletonList(f1));
>>>
>>>         WriteResult writeResult = lines.apply("Write and load data",
>>> BigQueryIO.<String>write() //
>>>                 .to(options.getTableSpec()) //
>>>                 .withFormatFunction(new SlowFormatter()) //
>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>> //                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
>>> //
>>>                 .withSchema(s2)
>>>
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>> //
>>>
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>
>>>
>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>> OnCompletion()));
>>>
>>> where
>>> + format-function "SlowFormatter" prints out each line and has a small
>>> sleep for testing purposes, and
>>> + DoFn OnCompletion just prints out the contents of each line
>>>
>>> In production code, OnCompletion would be fed some collection derived
>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>> message" rather than print..
>>>
>>> My expectation is that the "SlowFormatter" would run for each line, then
>>> the data would be uploaded, then OnCompletion would print each line. And
>>> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS,
>>> LinePrinter runs before the upload takes place.
>>>
>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>> is "complete for window" only after all data has been uploaded, which is
>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>
>>> I suspect the reason that this does not work for FILE_LOADS is that
>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>> failedInserts collection, ie data which is not connected to the
>>> batch-load-job that is triggered:
>>>   private WriteResult writeResult(Pipeline p) {
>>>     PCollection<TableRow> empty =
>>>         p.apply("CreateEmptyFailedInserts",
>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>   }
>>>
>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>> once a job is submitted the code repeatedly polls the job status until it
>>> reaches DONE or FAILED. However that information does not appear to be
>>> exposed anywhere (unlike streaming which effectively exposes
>>> completion-state via the failedInserts stream).
>>>
>>> If I have misunderstood something, corrections welcome! If not,
>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>
>>> Thanks,
>>> Simon
>>>
>>>

Reply via email to