Hi Simon, I think your explanation was very accurate, at least to my
understanding. I'd also be interested in getting batch load result's
feedback on the pipeline... hopefully someone may suggest something,
otherwise we could propose submitting a Jira, or even better, a PR!! :)

Thanks!

On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
simon.kitch...@unbelievable-machine.com> wrote:

> Hi All,
>
> I need to write some data to BigQuery (batch-mode) and then send a Pubsub
> message to trigger further processing.
>
> I found this thread titled "Callbacks/other functions run after a
> PDone/output transform" on the user-list which was very relevant:
>
> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>
> Thanks to the author of the Wait transform (Beam 2.4.0)!
>
> Unfortunately, it appears that the Wait.on transform does not work with
> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice
> appreciated.
>
> Here's (most of) the relevant test code:
>         Pipeline p = Pipeline.create(options);
>         PCollection<String> lines = p.apply("Read Input",
> Create.of("line1", "line2", "line3", "line4"));
>
>         TableFieldSchema f1 = new
> TableFieldSchema().setName("value").setType("string");
>         TableSchema s2 = new
> TableSchema().setFields(Collections.singletonList(f1));
>
>         WriteResult writeResult = lines.apply("Write and load data",
> BigQueryIO.<String>write() //
>                 .to(options.getTableSpec()) //
>                 .withFormatFunction(new SlowFormatter()) //
>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
> //                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>                 .withSchema(s2)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> //
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>
>
> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
> OnCompletion()));
>
> where
> + format-function "SlowFormatter" prints out each line and has a small
> sleep for testing purposes, and
> + DoFn OnCompletion just prints out the contents of each line
>
> In production code, OnCompletion would be fed some collection derived from
> lines, eg min/max record id, and the operation would be "send pubsub
> message" rather than print..
>
> My expectation is that the "SlowFormatter" would run for each line, then
> the data would be uploaded, then OnCompletion would print each line. And
> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS,
> LinePrinter runs before the upload takes place.
>
> I use WriteResult.getFailedInserts as that is the only "output" that
> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
> believe that it can be used as a "signal" for the Wait.on - ie the output
> is "complete for window" only after all data has been uploaded, which is
> what I need. And that does seem to work for STREAMING_LOADS.
>
> I suspect the reason that this does not work for FILE_LOADS is that method
> BatchLoads.writeResult returns a WriteResult that wraps an "empty"
> failedInserts collection, ie data which is not connected to the
> batch-load-job that is triggered:
>   private WriteResult writeResult(Pipeline p) {
>     PCollection<TableRow> empty =
>         p.apply("CreateEmptyFailedInserts",
> Create.empty(TypeDescriptor.of(TableRow.class)));
>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>   }
>
> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; once
> a job is submitted the code repeatedly polls the job status until it
> reaches DONE or FAILED. However that information does not appear to be
> exposed anywhere (unlike streaming which effectively exposes
> completion-state via the failedInserts stream).
>
> If I have misunderstood something, corrections welcome! If not,
> suggestions for workarounds or alternate solutions are also welcome :-)
>
> Thanks,
> Simon
>
>

Reply via email to