Hi All,

I need to write some data to BigQuery (batch-mode) and then send a Pubsub 
message to trigger further processing.

I found this thread titled "Callbacks/other functions run after a PDone/output 
transform" on the user-list which was very relevant:
  
https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E

Thanks to the author of the Wait transform (Beam 2.4.0)!

Unfortunately, it appears that the Wait.on transform does not work with 
BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice 
appreciated.

Here's (most of) the relevant test code:
        Pipeline p = Pipeline.create(options);
        PCollection<String> lines = p.apply("Read Input", Create.of("line1", 
"line2", "line3", "line4"));

        TableFieldSchema f1 = new 
TableFieldSchema().setName("value").setType("string");
        TableSchema s2 = new 
TableSchema().setFields(Collections.singletonList(f1));

        WriteResult writeResult = lines.apply("Write and load data", 
BigQueryIO.<String>write() //
                .to(options.getTableSpec()) //
                .withFormatFunction(new SlowFormatter()) //
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
//                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
                .withSchema(s2)
                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) //
                
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

        lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new 
OnCompletion()));

where
+ format-function "SlowFormatter" prints out each line and has a small sleep 
for testing purposes, and
+ DoFn OnCompletion just prints out the contents of each line

In production code, OnCompletion would be fed some collection derived from 
lines, eg min/max record id, and the operation would be "send pubsub message" 
rather than print..

My expectation is that the "SlowFormatter" would run for each line, then the 
data would be uploaded, then OnCompletion would print each line. And indeed 
that happens when STREAMING_INSERTS is used. However for FILE_LOADS, 
LinePrinter runs before the upload takes place.

I use WriteResult.getFailedInserts as that is the only "output" that 
BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but 
believe that it can be used as a "signal" for the Wait.on - ie the output is 
"complete for window" only after all data has been uploaded, which is what I 
need. And that does seem to work for STREAMING_LOADS.

I suspect the reason that this does not work for FILE_LOADS is that method 
BatchLoads.writeResult returns a WriteResult that wraps an "empty" 
failedInserts collection, ie data which is not connected to the batch-load-job 
that is triggered:
  private WriteResult writeResult(Pipeline p) {
    PCollection<TableRow> empty =
        p.apply("CreateEmptyFailedInserts", 
Create.empty(TypeDescriptor.of(TableRow.class)));
    return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
  }

Note that BatchLoads does "synchronously" invoke BigQuery load jobs; once a job 
is submitted the code repeatedly polls the job status until it reaches DONE or 
FAILED. However that information does not appear to be exposed anywhere (unlike 
streaming which effectively exposes completion-state via the failedInserts 
stream).

If I have misunderstood something, corrections welcome! If not, suggestions for 
workarounds or alternate solutions are also welcome :-)

Thanks,
Simon

Reply via email to