So I agree generally with the idea that returning a PCollection makes all of 
this easier so that arbitrary additional functions can be added, what exactly 
would write functions be returning in a PCollection that would make sense? The 
whole idea is that we’ve written to an external source and now the collection 
itself is no longer needed. 

Currently, that’s represented with a PDone, but currently that doesn’t allow 
any work to occur after it. I see a couple possible ways of handling this given 
this conversation, and am curious which solution sounds like the best way to 
deal with the problem:

1. Have output transforms always return something specific (which would be the 
same across transforms by convention), that is in the form of a PCollection, so 
operations can occur after it. 

2. Make either PDone or some new type that can act as a PCollection so we can 
run applies afterward. 

3. Make output transforms provide the facility for a callback function which 
runs after the transform is complete.

I went through these gymnastics recently when I was trying to build something 
that would move indices after writing to Algolia, and the solution was to 
co-opt code from the old Sink class that used to exist in Beam. The problem is 
that particular method requires the output transform in question
to return a PCollection, even if it is trivial or doesn’t make sense to return 
one. This seems like a bad solution, but unfortunately there isn’t a notion of 
a transform that has no explicit output that needs to have operations occur 
after it. 

The three potential solutions above address this issue, but I would like to 
hear on which would be preferable (or perhaps a different proposal 
altogether?). Perhaps we could also start up a ticket on this, since it seems 
like a worthwhile feature addition. I would find it really useful, for one. 

Chet

> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote:
> 
> Instead of a callback fn, its most useful if a PCollection is returned 
> containing the result of the sink so that any arbitrary additional functions 
> can be applied.
> 
> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <j...@nanthrax.net 
> <mailto:j...@nanthrax.net>> wrote:
> Agree, I would prefer to do the callback in the IO more than in the main.
> 
> Regards
> JB
> 
> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
> I do something almost exactly like this, but with BigtableIO instead.  I have 
> a pull request open here [1] (which reminds me I need to finish this up...).  
> It would really be nice for most IOs to support something like this.
> 
> Essentially you do a GroupByKey (or some CombineFn) on the output from the 
> BigtableIO, and then feed that into your function which will run when all 
> writes finish.
> 
> You probably want to avoid doing something in the main method because there's 
> no guarantee it'll actually run (maybe the driver will die, get killed, 
> machine will explode, etc).
> 
> [1] https://github.com/apache/beam/pull/3997 
> <https://github.com/apache/beam/pull/3997>
> 
> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com 
> <mailto:nerdyn...@gmail.com> <mailto:nerdyn...@gmail.com 
> <mailto:nerdyn...@gmail.com>>> wrote:
> 
>     Assuming you're in Java. You could just follow on in your Main method.
>     Checking the state of the Result.
> 
>     Example:
>     PipelineResult result = pipeline.run();
>     try {
>     result.waitUntilFinish();
>     if(result.getState() == PipelineResult.State.DONE) {
>     //DO ES work
>     }
>     } catch(Exception e) {
>     result.cancel();
>     throw e;
>     }
> 
>     Otherwise you could also use Oozie to construct a work flow.
> 
>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré <j...@nanthrax.net 
> <mailto:j...@nanthrax.net>
>     <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>> wrote:
> 
>         Hi,
> 
>         yes, we had a similar question some days ago.
> 
>         We can imagine to have a user callback fn fired when the sink batch is
>         complete.
> 
>         Let me think about that.
> 
>         Regards
>         JB
> 
>         On 12/01/2017 09:04 AM, Philip Chan wrote:
> 
>             Hey JB,
> 
>             Thanks for getting back so quickly.
>             I suppose in that case I would need a way of monitoring when the 
> ES
>             transform completes successfully before I can proceed with doing 
> the
>             swap.
>             The problem with this is that I can't think of a good way to
>             determine that termination state short of polling the new index to
>             check the document count compared to the size of input 
> PCollection.
>             That, or maybe I'd need to use an external system like you 
> mentioned
>             to poll on the state of the pipeline (I'm using Google Dataflow, 
> so
>             maybe there's a way to do this with some API).
>             But I would have thought that there would be an easy way of simply
>             saying "do not process this transform until this other transform
>             completes".
>             Is there no established way of "signaling" between pipelines when
>             some pipeline completes, or have some way of declaring a 
> dependency
>             of 1 transform on another transform?
> 
>             Thanks again,
>             Philip
> 
>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré
>             <j...@nanthrax.net <mailto:j...@nanthrax.net> 
> <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>> 
> <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>
> 
>             <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>>> wrote:
> 
>                  Hi Philip,
> 
>                  You won't be able to do (3) in the same pipeline as the
>             Elasticsearch Sink
>                  PTransform ends the pipeline with PDone.
> 
>                  So, (3) has to be done in another pipeline (using a DoFn) or 
> in
>             another
>                  "system" (like Camel for instance). I would do a check of the
>             data in the
>                  index and then trigger the swap there.
> 
>                  Regards
>                  JB
> 
>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
> 
>                      Hi,
> 
>                      I'm pretty new to Beam, and I've been trying to use the
>             ElasticSearchIO
>                      sink to write docs into ES.
>                      With this, I want to be able to
>                      1. ingest and transform rows from DB (done)
>                      2. write JSON docs/strings into a new ES index (done)
>                      3. After (2) is complete and all documents are written 
> into
>             a new index,
>                      trigger an atomic index swap under an alias to replace 
> the
>             current
>                      aliased index with the new index generated in step 2. 
> This
>             is basically
>                      a single POST request to the ES cluster.
> 
>                      The problem I'm facing is that I don't seem to be able to
>             find a way to
>                      have a way for (3) to happen after step (2) is complete.
> 
>                      The ElasticSearchIO.Write transform returns a PDone, and
>             I'm not sure
>                      how to proceed from there because it doesn't seem to let 
> me
>             do another
>                      apply on it to "define" a dependency.
>             
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>  
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>             
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>  
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>
>                                 
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>  
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>             
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>  
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>>
>                                 
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>  
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>             
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>  
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>
>                                 
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>  
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>             
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>  
> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>>>
> 
>                      Is there a recommended way to construct pipelines 
> workflows
>             like this?
> 
>                      Thanks in advance,
>                      Philip
> 
> 
>                  --     Jean-Baptiste Onofré
>             jbono...@apache.org <mailto:jbono...@apache.org> 
> <mailto:jbono...@apache.org <mailto:jbono...@apache.org>>
>             <mailto:jbono...@apache.org <mailto:jbono...@apache.org> 
> <mailto:jbono...@apache.org <mailto:jbono...@apache.org>>>
>             http://blog.nanthrax.net <http://blog.nanthrax.net/>
>                  Talend - http://www.talend.com <http://www.talend.com/>
> 
> 
> 
>         --         Jean-Baptiste Onofré
>         jbono...@apache.org <mailto:jbono...@apache.org> 
> <mailto:jbono...@apache.org <mailto:jbono...@apache.org>>
>         http://blog.nanthrax.net <http://blog.nanthrax.net/>
>         Talend - http://www.talend.com <http://www.talend.com/>
> 
> 
> 
> 
>     --     Nick Verbeck - NerdyNick
>     ----------------------------------------------------
>     NerdyNick.com <http://NerdyNick.com <http://nerdynick.com/>>
>     TrailsOffroad.com <http://TrailsOffroad.com <http://trailsoffroad.com/>>
>     NoKnownBoundaries.com <http://NoKnownBoundaries.com 
> <http://noknownboundaries.com/>>
> 
> 
> 
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org <mailto:jbono...@apache.org>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
> 

Reply via email to