Instead of a callback fn, its most useful if a PCollection is returned
containing the result of the sink so that any arbitrary additional
functions can be applied.

On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

> Agree, I would prefer to do the callback in the IO more than in the main.
>
> Regards
> JB
>
> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>
>> I do something almost exactly like this, but with BigtableIO instead.  I
>> have a pull request open here [1] (which reminds me I need to finish this
>> up...).  It would really be nice for most IOs to support something like
>> this.
>>
>> Essentially you do a GroupByKey (or some CombineFn) on the output from
>> the BigtableIO, and then feed that into your function which will run when
>> all writes finish.
>>
>> You probably want to avoid doing something in the main method because
>> there's no guarantee it'll actually run (maybe the driver will die, get
>> killed, machine will explode, etc).
>>
>> [1] https://github.com/apache/beam/pull/3997
>>
>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <[email protected] <mailto:
>> [email protected]>> wrote:
>>
>>     Assuming you're in Java. You could just follow on in your Main method.
>>     Checking the state of the Result.
>>
>>     Example:
>>     PipelineResult result = pipeline.run();
>>     try {
>>     result.waitUntilFinish();
>>     if(result.getState() == PipelineResult.State.DONE) {
>>     //DO ES work
>>     }
>>     } catch(Exception e) {
>>     result.cancel();
>>     throw e;
>>     }
>>
>>     Otherwise you could also use Oozie to construct a work flow.
>>
>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré <[email protected]
>>     <mailto:[email protected]>> wrote:
>>
>>         Hi,
>>
>>         yes, we had a similar question some days ago.
>>
>>         We can imagine to have a user callback fn fired when the sink
>> batch is
>>         complete.
>>
>>         Let me think about that.
>>
>>         Regards
>>         JB
>>
>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>
>>             Hey JB,
>>
>>             Thanks for getting back so quickly.
>>             I suppose in that case I would need a way of monitoring when
>> the ES
>>             transform completes successfully before I can proceed with
>> doing the
>>             swap.
>>             The problem with this is that I can't think of a good way to
>>             determine that termination state short of polling the new
>> index to
>>             check the document count compared to the size of input
>> PCollection.
>>             That, or maybe I'd need to use an external system like you
>> mentioned
>>             to poll on the state of the pipeline (I'm using Google
>> Dataflow, so
>>             maybe there's a way to do this with some API).
>>             But I would have thought that there would be an easy way of
>> simply
>>             saying "do not process this transform until this other
>> transform
>>             completes".
>>             Is there no established way of "signaling" between pipelines
>> when
>>             some pipeline completes, or have some way of declaring a
>> dependency
>>             of 1 transform on another transform?
>>
>>             Thanks again,
>>             Philip
>>
>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré
>>             <[email protected] <mailto:[email protected]> <mailto:
>> [email protected]
>>
>>             <mailto:[email protected]>>> wrote:
>>
>>                  Hi Philip,
>>
>>                  You won't be able to do (3) in the same pipeline as the
>>             Elasticsearch Sink
>>                  PTransform ends the pipeline with PDone.
>>
>>                  So, (3) has to be done in another pipeline (using a
>> DoFn) or in
>>             another
>>                  "system" (like Camel for instance). I would do a check
>> of the
>>             data in the
>>                  index and then trigger the swap there.
>>
>>                  Regards
>>                  JB
>>
>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>>
>>                      Hi,
>>
>>                      I'm pretty new to Beam, and I've been trying to use
>> the
>>             ElasticSearchIO
>>                      sink to write docs into ES.
>>                      With this, I want to be able to
>>                      1. ingest and transform rows from DB (done)
>>                      2. write JSON docs/strings into a new ES index (done)
>>                      3. After (2) is complete and all documents are
>> written into
>>             a new index,
>>                      trigger an atomic index swap under an alias to
>> replace the
>>             current
>>                      aliased index with the new index generated in step
>> 2. This
>>             is basically
>>                      a single POST request to the ES cluster.
>>
>>                      The problem I'm facing is that I don't seem to be
>> able to
>>             find a way to
>>                      have a way for (3) to happen after step (2) is
>> complete.
>>
>>                      The ElasticSearchIO.Write transform returns a PDone,
>> and
>>             I'm not sure
>>                      how to proceed from there because it doesn't seem to
>> let me
>>             do another
>>                      apply on it to "define" a dependency.
>>             https://beam.apache.org/documentation/sdks/javadoc/2.1.0/
>> org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>             <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/
>> org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>>                                 <https://beam.apache.org/docum
>> entation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elastic
>> search/ElasticsearchIO.Write.html
>>             <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/
>> org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>
>>                                 <https://beam.apache.org/docum
>> entation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elastic
>> search/ElasticsearchIO.Write.html
>>             <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/
>> org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>>                                 <https://beam.apache.org/docum
>> entation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elastic
>> search/ElasticsearchIO.Write.html
>>             <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/
>> org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>>
>>
>>                      Is there a recommended way to construct pipelines
>> workflows
>>             like this?
>>
>>                      Thanks in advance,
>>                      Philip
>>
>>
>>                  --     Jean-Baptiste Onofré
>>             [email protected] <mailto:[email protected]>
>>             <mailto:[email protected] <mailto:[email protected]>>
>>             http://blog.nanthrax.net
>>                  Talend - http://www.talend.com
>>
>>
>>
>>         --         Jean-Baptiste Onofré
>>         [email protected] <mailto:[email protected]>
>>         http://blog.nanthrax.net
>>         Talend - http://www.talend.com
>>
>>
>>
>>
>>     --     Nick Verbeck - NerdyNick
>>     ----------------------------------------------------
>>     NerdyNick.com <http://NerdyNick.com>
>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>
>>
>>
>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to