So I agree generally with the idea that returning a PCollection makes all of this easier so that arbitrary additional functions can be added, what exactly would write functions be returning in a PCollection that would make sense? The whole idea is that we’ve written to an external source and now the collection itself is no longer needed.
Currently, that’s represented with a PDone, but currently that doesn’t allow any work to occur after it. I see a couple possible ways of handling this given this conversation, and am curious which solution sounds like the best way to deal with the problem: 1. Have output transforms always return something specific (which would be the same across transforms by convention), that is in the form of a PCollection, so operations can occur after it. 2. Make either PDone or some new type that can act as a PCollection so we can run applies afterward. 3. Make output transforms provide the facility for a callback function which runs after the transform is complete. I went through these gymnastics recently when I was trying to build something that would move indices after writing to Algolia, and the solution was to co-opt code from the old Sink class that used to exist in Beam. The problem is that particular method requires the output transform in question to return a PCollection, even if it is trivial or doesn’t make sense to return one. This seems like a bad solution, but unfortunately there isn’t a notion of a transform that has no explicit output that needs to have operations occur after it. The three potential solutions above address this issue, but I would like to hear on which would be preferable (or perhaps a different proposal altogether?). Perhaps we could also start up a ticket on this, since it seems like a worthwhile feature addition. I would find it really useful, for one. Chet > On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote: > > Instead of a callback fn, its most useful if a PCollection is returned > containing the result of the sink so that any arbitrary additional functions > can be applied. > > On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <j...@nanthrax.net > <mailto:j...@nanthrax.net>> wrote: > Agree, I would prefer to do the callback in the IO more than in the main. > > Regards > JB > > On 12/01/2017 03:54 PM, Steve Niemitz wrote: > I do something almost exactly like this, but with BigtableIO instead. I have > a pull request open here [1] (which reminds me I need to finish this up...). > It would really be nice for most IOs to support something like this. > > Essentially you do a GroupByKey (or some CombineFn) on the output from the > BigtableIO, and then feed that into your function which will run when all > writes finish. > > You probably want to avoid doing something in the main method because there's > no guarantee it'll actually run (maybe the driver will die, get killed, > machine will explode, etc). > > [1] https://github.com/apache/beam/pull/3997 > <https://github.com/apache/beam/pull/3997> > > On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com > <mailto:nerdyn...@gmail.com> <mailto:nerdyn...@gmail.com > <mailto:nerdyn...@gmail.com>>> wrote: > > Assuming you're in Java. You could just follow on in your Main method. > Checking the state of the Result. > > Example: > PipelineResult result = pipeline.run(); > try { > result.waitUntilFinish(); > if(result.getState() == PipelineResult.State.DONE) { > //DO ES work > } > } catch(Exception e) { > result.cancel(); > throw e; > } > > Otherwise you could also use Oozie to construct a work flow. > > On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré <j...@nanthrax.net > <mailto:j...@nanthrax.net> > <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>> wrote: > > Hi, > > yes, we had a similar question some days ago. > > We can imagine to have a user callback fn fired when the sink batch is > complete. > > Let me think about that. > > Regards > JB > > On 12/01/2017 09:04 AM, Philip Chan wrote: > > Hey JB, > > Thanks for getting back so quickly. > I suppose in that case I would need a way of monitoring when the > ES > transform completes successfully before I can proceed with doing > the > swap. > The problem with this is that I can't think of a good way to > determine that termination state short of polling the new index to > check the document count compared to the size of input > PCollection. > That, or maybe I'd need to use an external system like you > mentioned > to poll on the state of the pipeline (I'm using Google Dataflow, > so > maybe there's a way to do this with some API). > But I would have thought that there would be an easy way of simply > saying "do not process this transform until this other transform > completes". > Is there no established way of "signaling" between pipelines when > some pipeline completes, or have some way of declaring a > dependency > of 1 transform on another transform? > > Thanks again, > Philip > > On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré > <j...@nanthrax.net <mailto:j...@nanthrax.net> > <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>> > <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net> > > <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>>> wrote: > > Hi Philip, > > You won't be able to do (3) in the same pipeline as the > Elasticsearch Sink > PTransform ends the pipeline with PDone. > > So, (3) has to be done in another pipeline (using a DoFn) or > in > another > "system" (like Camel for instance). I would do a check of the > data in the > index and then trigger the swap there. > > Regards > JB > > On 12/01/2017 08:41 AM, Philip Chan wrote: > > Hi, > > I'm pretty new to Beam, and I've been trying to use the > ElasticSearchIO > sink to write docs into ES. > With this, I want to be able to > 1. ingest and transform rows from DB (done) > 2. write JSON docs/strings into a new ES index (done) > 3. After (2) is complete and all documents are written > into > a new index, > trigger an atomic index swap under an alias to replace > the > current > aliased index with the new index generated in step 2. > This > is basically > a single POST request to the ES cluster. > > The problem I'm facing is that I don't seem to be able to > find a way to > have a way for (3) to happen after step (2) is complete. > > The ElasticSearchIO.Write transform returns a PDone, and > I'm not sure > how to proceed from there because it doesn't seem to let > me > do another > apply on it to "define" a dependency. > > https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html> > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>> > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html> > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>> > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html> > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>> > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html> > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html > > <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>>> > > Is there a recommended way to construct pipelines > workflows > like this? > > Thanks in advance, > Philip > > > -- Jean-Baptiste Onofré > jbono...@apache.org <mailto:jbono...@apache.org> > <mailto:jbono...@apache.org <mailto:jbono...@apache.org>> > <mailto:jbono...@apache.org <mailto:jbono...@apache.org> > <mailto:jbono...@apache.org <mailto:jbono...@apache.org>>> > http://blog.nanthrax.net <http://blog.nanthrax.net/> > Talend - http://www.talend.com <http://www.talend.com/> > > > > -- Jean-Baptiste Onofré > jbono...@apache.org <mailto:jbono...@apache.org> > <mailto:jbono...@apache.org <mailto:jbono...@apache.org>> > http://blog.nanthrax.net <http://blog.nanthrax.net/> > Talend - http://www.talend.com <http://www.talend.com/> > > > > > -- Nick Verbeck - NerdyNick > ---------------------------------------------------- > NerdyNick.com <http://NerdyNick.com <http://nerdynick.com/>> > TrailsOffroad.com <http://TrailsOffroad.com <http://trailsoffroad.com/>> > NoKnownBoundaries.com <http://NoKnownBoundaries.com > <http://noknownboundaries.com/>> > > > > > -- > Jean-Baptiste Onofré > jbono...@apache.org <mailto:jbono...@apache.org> > http://blog.nanthrax.net <http://blog.nanthrax.net/> > Talend - http://www.talend.com <http://www.talend.com/> >