Re: [DISCUSS] ExecIO
Hi guys, I understand your point. The Exec "IO" can already take input commands from a PCollection, but the user has to prepare the commands. I will improve the ExecFn as you said: be able to construct the shell commands using elements in the PCollection (using one element as command, the others as arguments). I agree with your statement about DoFn: a DoFn in a "middle" of a pipeline is not an IO. An IO acts as endpoints in a pipeline: starting endpoint for Read, ending endpoint for Write. Point is that a DoFn can be a connector (for instance a MySQL database lookup as you said) but it can be wrapped as an IO. If I compare with Apache Camel, a pipeline (aka route) starts with an unique (it's what we name a consumer endpoint on a Camel component). A producer endpoint can end a route or be used in any middle step. It provides a convenient way to extend the processing/routing logic. It's like a DoFn. Regards JB On 12/08/2016 09:37 PM, Ben Chambers wrote: I think I agree with Robert (unless I'm misunderstanding his point). I think that the shell commands are going to be the most useful if it is possible to take the elements in an input PCollection, construct a shell command depending on those elements, and then execute it. I think doing so in a fully general manner outside of a DoFn will be difficult. If instead we made it easier to declare a DoFn as having requirements on the environment (these programs must be available in the shell) and easier to execute shell commands within a DoFn, I think that covers many more use cases. On Thu, Dec 8, 2016 at 12:23 PM Robert Bradshawwrote: On Wed, Dec 7, 2016 at 1:32 AM, Jean-Baptiste Onofré wrote: By the way, just to elaborate a bit why I provided as an IO: 1. From an user experience perspective, I think we have to provide convenient way to write pipeline. Any syntax simplifying this is valuable. I think it's easier to write: pipeline.apply(ExecIO.read().withCommand("foo")) than: pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn()); Slightly. Still, when I see pipeline.apply(ExecIO.read().withCommand("foo")) I am surprised to get a PCollection with a single element... 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for "connector": reading/writing from/to a data source. So, even without the IO "wrapping" (by wrapping, I mean the Read and Write), I think Exec extension should be in IO as it's a source/write of data. To clarify, if you wrote a DoFn that, say, did lookups against a MySQL database, you would consider this an IO? For me, IO denotes input/output, i.e. the roots and leaves of a pipeline. Regards JB On 12/07/2016 08:37 AM, Robert Bradshaw wrote: I don't mean to derail the tricky environment questions, but I'm not seeing why this is bundled as an IO rather than a plain DoFn (which can be applied to a PCollection of one or more commands, yielding their outputs). Especially for the case of a Read, which in this case is not splittable (initially or dynamically) and always produces a single element--feels much more like a Map to me. On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov wrote: Ben - the issues of "things aren't hung, there is a shell command running", aren't they general to all DoFn's? i.e. I don't see why the runner would need to know that a shell command is running, but not that, say, a heavy monolithic computation is running. What's the benefit to the runner in knowing that the DoFn contains a shell command? By saying "making sure that all shell commands finish", I suppose you're referring to the possibility of leaks if the user initiates a shell command and forgets to wait for it? I think that should be solvable again without Beam intervention, by making a utility class for running shell commands which implements AutoCloseable, and document that you have to use it that way. Ken - I think the question here is: are we ok with a situation where the runner doesn't check or care whether the shell command can run, and the user accepts this risk and studies what commands will be available on the worker environment provided by the runner they use in production, before productionizing a pipeline with those commands. Upon some thought I think it's ok. Of course, this carries an obligation for runners to document their worker environment and its changes across versions. Though for many runners such documentation may be trivial: "whatever your YARN cluster has, the runner doesn't change it in any way" and it may be good enough for users. And for other runners, like Dataflow, such documentation may also be trivial: "no guarantees whatsoever, only what you stage in --filesToStage is available". I can also see Beam develop to a point where we'd want all runners to be able to run your DoFn in a user-specified Docker container, and manage those intelligently - but I think that's
Re: [DISCUSS] ExecIO
I think I agree with Robert (unless I'm misunderstanding his point). I think that the shell commands are going to be the most useful if it is possible to take the elements in an input PCollection, construct a shell command depending on those elements, and then execute it. I think doing so in a fully general manner outside of a DoFn will be difficult. If instead we made it easier to declare a DoFn as having requirements on the environment (these programs must be available in the shell) and easier to execute shell commands within a DoFn, I think that covers many more use cases. On Thu, Dec 8, 2016 at 12:23 PM Robert Bradshawwrote: > On Wed, Dec 7, 2016 at 1:32 AM, Jean-Baptiste Onofré > wrote: > > By the way, just to elaborate a bit why I provided as an IO: > > > > 1. From an user experience perspective, I think we have to provide > > convenient way to write pipeline. Any syntax simplifying this is > valuable. > > I think it's easier to write: > > > > pipeline.apply(ExecIO.read().withCommand("foo")) > > > > than: > > > > pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn()); > > Slightly. Still, when I see > > pipeline.apply(ExecIO.read().withCommand("foo")) > > I am surprised to get a PCollection with a single element... > > > 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for > > "connector": reading/writing from/to a data source. So, even without the > IO > > "wrapping" (by wrapping, I mean the Read and Write), I think Exec > extension > > should be in IO as it's a source/write of data. > > To clarify, if you wrote a DoFn that, say, did lookups against a MySQL > database, you would consider this an IO? For me, IO denotes > input/output, i.e. the roots and leaves of a pipeline. > > > Regards > > JB > > > > On 12/07/2016 08:37 AM, Robert Bradshaw wrote: > >> > >> I don't mean to derail the tricky environment questions, but I'm not > >> seeing why this is bundled as an IO rather than a plain DoFn (which > >> can be applied to a PCollection of one or more commands, yielding > >> their outputs). Especially for the case of a Read, which in this case > >> is not splittable (initially or dynamically) and always produces a > >> single element--feels much more like a Map to me. > >> > >> On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov > >> wrote: > >>> > >>> Ben - the issues of "things aren't hung, there is a shell command > >>> running", > >>> aren't they general to all DoFn's? i.e. I don't see why the runner > would > >>> need to know that a shell command is running, but not that, say, a > heavy > >>> monolithic computation is running. What's the benefit to the runner in > >>> knowing that the DoFn contains a shell command? > >>> > >>> By saying "making sure that all shell commands finish", I suppose > you're > >>> referring to the possibility of leaks if the user initiates a shell > >>> command > >>> and forgets to wait for it? I think that should be solvable again > without > >>> Beam intervention, by making a utility class for running shell commands > >>> which implements AutoCloseable, and document that you have to use it > that > >>> way. > >>> > >>> Ken - I think the question here is: are we ok with a situation where > the > >>> runner doesn't check or care whether the shell command can run, and the > >>> user accepts this risk and studies what commands will be available on > the > >>> worker environment provided by the runner they use in production, > before > >>> productionizing a pipeline with those commands. > >>> > >>> Upon some thought I think it's ok. Of course, this carries an > obligation > >>> for runners to document their worker environment and its changes across > >>> versions. Though for many runners such documentation may be trivial: > >>> "whatever your YARN cluster has, the runner doesn't change it in any > way" > >>> and it may be good enough for users. And for other runners, like > >>> Dataflow, > >>> such documentation may also be trivial: "no guarantees whatsoever, only > >>> what you stage in --filesToStage is available". > >>> > >>> I can also see Beam develop to a point where we'd want all runners to > be > >>> able to run your DoFn in a user-specified Docker container, and manage > >>> those intelligently - but I think that's quite a while away and it > >>> doesn't > >>> have to block work on a utility for executing shell commands. Though > it'd > >>> be nice if the utility was forward-compatible with that future world. > >>> > >>> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré > >>> wrote: > >>> > Hi Eugene, > > thanks for the extended questions. > > I think we have two levels of expectations here: > - end-user responsibility > - worker/runner responsibility > > 1/ From a end-user perspective, the end-user has to know that using a > system command (via ExecIO) and more generally speaking
Re: [DISCUSS] ExecIO
On Wed, Dec 7, 2016 at 1:32 AM, Jean-Baptiste Onofréwrote: > By the way, just to elaborate a bit why I provided as an IO: > > 1. From an user experience perspective, I think we have to provide > convenient way to write pipeline. Any syntax simplifying this is valuable. > I think it's easier to write: > > pipeline.apply(ExecIO.read().withCommand("foo")) > > than: > > pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn()); Slightly. Still, when I see pipeline.apply(ExecIO.read().withCommand("foo")) I am surprised to get a PCollection with a single element... > 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for > "connector": reading/writing from/to a data source. So, even without the IO > "wrapping" (by wrapping, I mean the Read and Write), I think Exec extension > should be in IO as it's a source/write of data. To clarify, if you wrote a DoFn that, say, did lookups against a MySQL database, you would consider this an IO? For me, IO denotes input/output, i.e. the roots and leaves of a pipeline. > Regards > JB > > On 12/07/2016 08:37 AM, Robert Bradshaw wrote: >> >> I don't mean to derail the tricky environment questions, but I'm not >> seeing why this is bundled as an IO rather than a plain DoFn (which >> can be applied to a PCollection of one or more commands, yielding >> their outputs). Especially for the case of a Read, which in this case >> is not splittable (initially or dynamically) and always produces a >> single element--feels much more like a Map to me. >> >> On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov >> wrote: >>> >>> Ben - the issues of "things aren't hung, there is a shell command >>> running", >>> aren't they general to all DoFn's? i.e. I don't see why the runner would >>> need to know that a shell command is running, but not that, say, a heavy >>> monolithic computation is running. What's the benefit to the runner in >>> knowing that the DoFn contains a shell command? >>> >>> By saying "making sure that all shell commands finish", I suppose you're >>> referring to the possibility of leaks if the user initiates a shell >>> command >>> and forgets to wait for it? I think that should be solvable again without >>> Beam intervention, by making a utility class for running shell commands >>> which implements AutoCloseable, and document that you have to use it that >>> way. >>> >>> Ken - I think the question here is: are we ok with a situation where the >>> runner doesn't check or care whether the shell command can run, and the >>> user accepts this risk and studies what commands will be available on the >>> worker environment provided by the runner they use in production, before >>> productionizing a pipeline with those commands. >>> >>> Upon some thought I think it's ok. Of course, this carries an obligation >>> for runners to document their worker environment and its changes across >>> versions. Though for many runners such documentation may be trivial: >>> "whatever your YARN cluster has, the runner doesn't change it in any way" >>> and it may be good enough for users. And for other runners, like >>> Dataflow, >>> such documentation may also be trivial: "no guarantees whatsoever, only >>> what you stage in --filesToStage is available". >>> >>> I can also see Beam develop to a point where we'd want all runners to be >>> able to run your DoFn in a user-specified Docker container, and manage >>> those intelligently - but I think that's quite a while away and it >>> doesn't >>> have to block work on a utility for executing shell commands. Though it'd >>> be nice if the utility was forward-compatible with that future world. >>> >>> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré >>> wrote: >>> Hi Eugene, thanks for the extended questions. I think we have two levels of expectations here: - end-user responsibility - worker/runner responsibility 1/ From a end-user perspective, the end-user has to know that using a system command (via ExecIO) and more generally speaking anything which relay on worker resources (for instance a local filesystem directory available only on a worker) can fail if the expected resource is not present on all workers. So, basically, all workers should have the same topology. It's what I'm assuming for the PR. For example, I have my Spark cluster, using the same Mesos/Docker setup, then the user knows that all nodes in the cluster will have the same setup and so resources (it could be provided by DevOps for instance). On the other hand, running on Dataflow is different because I don't "control" the nodes (bootstrapping or resources), but in that case, the user knows it (he knows the runner he's using). 2/ As you said, we can expect that runner can deal with some requirements (expressed depending of the pipeline and the runner), and the runner can
Re: [DISCUSS] ExecIO
(discussion continues on a thread called "Naming and API for executing shell commands") On Wed, Dec 7, 2016 at 1:32 AM Jean-Baptiste Onofréwrote: > By the way, just to elaborate a bit why I provided as an IO: > > 1. From an user experience perspective, I think we have to provide > convenient way to write pipeline. Any syntax simplifying this is valuable. > I think it's easier to write: > > pipeline.apply(ExecIO.read().withCommand("foo")) > > than: > > pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn()); > > 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for > "connector": reading/writing from/to a data source. So, even without the > IO "wrapping" (by wrapping, I mean the Read and Write), I think Exec > extension should be in IO as it's a source/write of data. > > Regards > JB > > On 12/07/2016 08:37 AM, Robert Bradshaw wrote: > > I don't mean to derail the tricky environment questions, but I'm not > > seeing why this is bundled as an IO rather than a plain DoFn (which > > can be applied to a PCollection of one or more commands, yielding > > their outputs). Especially for the case of a Read, which in this case > > is not splittable (initially or dynamically) and always produces a > > single element--feels much more like a Map to me. > > > > On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov > > wrote: > >> Ben - the issues of "things aren't hung, there is a shell command > running", > >> aren't they general to all DoFn's? i.e. I don't see why the runner would > >> need to know that a shell command is running, but not that, say, a heavy > >> monolithic computation is running. What's the benefit to the runner in > >> knowing that the DoFn contains a shell command? > >> > >> By saying "making sure that all shell commands finish", I suppose you're > >> referring to the possibility of leaks if the user initiates a shell > command > >> and forgets to wait for it? I think that should be solvable again > without > >> Beam intervention, by making a utility class for running shell commands > >> which implements AutoCloseable, and document that you have to use it > that > >> way. > >> > >> Ken - I think the question here is: are we ok with a situation where the > >> runner doesn't check or care whether the shell command can run, and the > >> user accepts this risk and studies what commands will be available on > the > >> worker environment provided by the runner they use in production, before > >> productionizing a pipeline with those commands. > >> > >> Upon some thought I think it's ok. Of course, this carries an obligation > >> for runners to document their worker environment and its changes across > >> versions. Though for many runners such documentation may be trivial: > >> "whatever your YARN cluster has, the runner doesn't change it in any > way" > >> and it may be good enough for users. And for other runners, like > Dataflow, > >> such documentation may also be trivial: "no guarantees whatsoever, only > >> what you stage in --filesToStage is available". > >> > >> I can also see Beam develop to a point where we'd want all runners to be > >> able to run your DoFn in a user-specified Docker container, and manage > >> those intelligently - but I think that's quite a while away and it > doesn't > >> have to block work on a utility for executing shell commands. Though > it'd > >> be nice if the utility was forward-compatible with that future world. > >> > >> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré > wrote: > >> > >>> Hi Eugene, > >>> > >>> thanks for the extended questions. > >>> > >>> I think we have two levels of expectations here: > >>> - end-user responsibility > >>> - worker/runner responsibility > >>> > >>> 1/ From a end-user perspective, the end-user has to know that using a > >>> system command (via ExecIO) and more generally speaking anything which > >>> relay on worker resources (for instance a local filesystem directory > >>> available only on a worker) can fail if the expected resource is not > >>> present on all workers. So, basically, all workers should have the same > >>> topology. It's what I'm assuming for the PR. > >>> For example, I have my Spark cluster, using the same Mesos/Docker > setup, > >>> then the user knows that all nodes in the cluster will have the same > >>> setup and so resources (it could be provided by DevOps for instance). > >>> On the other hand, running on Dataflow is different because I don't > >>> "control" the nodes (bootstrapping or resources), but in that case, the > >>> user knows it (he knows the runner he's using). > >>> > >>> 2/ As you said, we can expect that runner can deal with some > >>> requirements (expressed depending of the pipeline and the runner), and > >>> the runner can know the workers which provide capabilities matching > >>> those requirements. > >>> Then, the end user is not more responsible: the runner will try to > >>> define if the pipeline
Re: [DISCUSS] ExecIO
By the way, just to elaborate a bit why I provided as an IO: 1. From an user experience perspective, I think we have to provide convenient way to write pipeline. Any syntax simplifying this is valuable. I think it's easier to write: pipeline.apply(ExecIO.read().withCommand("foo")) than: pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn()); 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for "connector": reading/writing from/to a data source. So, even without the IO "wrapping" (by wrapping, I mean the Read and Write), I think Exec extension should be in IO as it's a source/write of data. Regards JB On 12/07/2016 08:37 AM, Robert Bradshaw wrote: I don't mean to derail the tricky environment questions, but I'm not seeing why this is bundled as an IO rather than a plain DoFn (which can be applied to a PCollection of one or more commands, yielding their outputs). Especially for the case of a Read, which in this case is not splittable (initially or dynamically) and always produces a single element--feels much more like a Map to me. On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichovwrote: Ben - the issues of "things aren't hung, there is a shell command running", aren't they general to all DoFn's? i.e. I don't see why the runner would need to know that a shell command is running, but not that, say, a heavy monolithic computation is running. What's the benefit to the runner in knowing that the DoFn contains a shell command? By saying "making sure that all shell commands finish", I suppose you're referring to the possibility of leaks if the user initiates a shell command and forgets to wait for it? I think that should be solvable again without Beam intervention, by making a utility class for running shell commands which implements AutoCloseable, and document that you have to use it that way. Ken - I think the question here is: are we ok with a situation where the runner doesn't check or care whether the shell command can run, and the user accepts this risk and studies what commands will be available on the worker environment provided by the runner they use in production, before productionizing a pipeline with those commands. Upon some thought I think it's ok. Of course, this carries an obligation for runners to document their worker environment and its changes across versions. Though for many runners such documentation may be trivial: "whatever your YARN cluster has, the runner doesn't change it in any way" and it may be good enough for users. And for other runners, like Dataflow, such documentation may also be trivial: "no guarantees whatsoever, only what you stage in --filesToStage is available". I can also see Beam develop to a point where we'd want all runners to be able to run your DoFn in a user-specified Docker container, and manage those intelligently - but I think that's quite a while away and it doesn't have to block work on a utility for executing shell commands. Though it'd be nice if the utility was forward-compatible with that future world. On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré wrote: Hi Eugene, thanks for the extended questions. I think we have two levels of expectations here: - end-user responsibility - worker/runner responsibility 1/ From a end-user perspective, the end-user has to know that using a system command (via ExecIO) and more generally speaking anything which relay on worker resources (for instance a local filesystem directory available only on a worker) can fail if the expected resource is not present on all workers. So, basically, all workers should have the same topology. It's what I'm assuming for the PR. For example, I have my Spark cluster, using the same Mesos/Docker setup, then the user knows that all nodes in the cluster will have the same setup and so resources (it could be provided by DevOps for instance). On the other hand, running on Dataflow is different because I don't "control" the nodes (bootstrapping or resources), but in that case, the user knows it (he knows the runner he's using). 2/ As you said, we can expect that runner can deal with some requirements (expressed depending of the pipeline and the runner), and the runner can know the workers which provide capabilities matching those requirements. Then, the end user is not more responsible: the runner will try to define if the pipeline can be executed, and where a DoFn has to be run (on which worker). For me, it's two different levels where 2 is smarter but 1 can also make sense. WDYT ? Regards JB On 12/05/2016 08:51 PM, Eugene Kirpichov wrote: Hi JB, Thanks for bringing this to the mailing list. I also think that this is useful in general (and that use cases for Beam are more than just classic bigdata), and that there are interesting questions here at different levels about how to do it right. I suggest to start with the highest-level question [and discuss the particular API only after
Re: [DISCUSS] ExecIO
Ben - the issues of "things aren't hung, there is a shell command running", aren't they general to all DoFn's? i.e. I don't see why the runner would need to know that a shell command is running, but not that, say, a heavy monolithic computation is running. What's the benefit to the runner in knowing that the DoFn contains a shell command? By saying "making sure that all shell commands finish", I suppose you're referring to the possibility of leaks if the user initiates a shell command and forgets to wait for it? I think that should be solvable again without Beam intervention, by making a utility class for running shell commands which implements AutoCloseable, and document that you have to use it that way. Ken - I think the question here is: are we ok with a situation where the runner doesn't check or care whether the shell command can run, and the user accepts this risk and studies what commands will be available on the worker environment provided by the runner they use in production, before productionizing a pipeline with those commands. Upon some thought I think it's ok. Of course, this carries an obligation for runners to document their worker environment and its changes across versions. Though for many runners such documentation may be trivial: "whatever your YARN cluster has, the runner doesn't change it in any way" and it may be good enough for users. And for other runners, like Dataflow, such documentation may also be trivial: "no guarantees whatsoever, only what you stage in --filesToStage is available". I can also see Beam develop to a point where we'd want all runners to be able to run your DoFn in a user-specified Docker container, and manage those intelligently - but I think that's quite a while away and it doesn't have to block work on a utility for executing shell commands. Though it'd be nice if the utility was forward-compatible with that future world. On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofréwrote: > Hi Eugene, > > thanks for the extended questions. > > I think we have two levels of expectations here: > - end-user responsibility > - worker/runner responsibility > > 1/ From a end-user perspective, the end-user has to know that using a > system command (via ExecIO) and more generally speaking anything which > relay on worker resources (for instance a local filesystem directory > available only on a worker) can fail if the expected resource is not > present on all workers. So, basically, all workers should have the same > topology. It's what I'm assuming for the PR. > For example, I have my Spark cluster, using the same Mesos/Docker setup, > then the user knows that all nodes in the cluster will have the same > setup and so resources (it could be provided by DevOps for instance). > On the other hand, running on Dataflow is different because I don't > "control" the nodes (bootstrapping or resources), but in that case, the > user knows it (he knows the runner he's using). > > 2/ As you said, we can expect that runner can deal with some > requirements (expressed depending of the pipeline and the runner), and > the runner can know the workers which provide capabilities matching > those requirements. > Then, the end user is not more responsible: the runner will try to > define if the pipeline can be executed, and where a DoFn has to be run > (on which worker). > > For me, it's two different levels where 2 is smarter but 1 can also make > sense. > > WDYT ? > > Regards > JB > > On 12/05/2016 08:51 PM, Eugene Kirpichov wrote: > > Hi JB, > > > > Thanks for bringing this to the mailing list. I also think that this is > > useful in general (and that use cases for Beam are more than just classic > > bigdata), and that there are interesting questions here at different > levels > > about how to do it right. > > > > I suggest to start with the highest-level question [and discuss the > > particular API only after agreeing on this, possibly in a separate > thread]: > > how to deal with the fact that Beam gives no guarantees about the > > environment on workers, e.g. which commands are available, which shell or > > even OS is being used, etc. Particularly: > > > > - Obviously different runners will have a different environment, e.g. > > Dataflow workers are not going to have Hadoop commands available because > > they are not running on a Hadoop cluster. So, pipelines and transforms > > developed using this connector will be necessarily non-portable between > > different runners. Maybe this is ok? But we need to give users a clear > > expectation about this. How do we phrase this expectation and where do we > > put it in the docs? > > > > - I'm concerned that this puts additional compatibility requirements on > > runners - it becomes necessary for a runner to document the environment > of > > its workers (OS, shell, privileges, guaranteed-installed packages, access > > to other things on the host machine e.g. whether or not the worker runs > in > > its own container, etc.)
Re: [DISCUSS] ExecIO
Hi Eugene, thanks for the extended questions. I think we have two levels of expectations here: - end-user responsibility - worker/runner responsibility 1/ From a end-user perspective, the end-user has to know that using a system command (via ExecIO) and more generally speaking anything which relay on worker resources (for instance a local filesystem directory available only on a worker) can fail if the expected resource is not present on all workers. So, basically, all workers should have the same topology. It's what I'm assuming for the PR. For example, I have my Spark cluster, using the same Mesos/Docker setup, then the user knows that all nodes in the cluster will have the same setup and so resources (it could be provided by DevOps for instance). On the other hand, running on Dataflow is different because I don't "control" the nodes (bootstrapping or resources), but in that case, the user knows it (he knows the runner he's using). 2/ As you said, we can expect that runner can deal with some requirements (expressed depending of the pipeline and the runner), and the runner can know the workers which provide capabilities matching those requirements. Then, the end user is not more responsible: the runner will try to define if the pipeline can be executed, and where a DoFn has to be run (on which worker). For me, it's two different levels where 2 is smarter but 1 can also make sense. WDYT ? Regards JB On 12/05/2016 08:51 PM, Eugene Kirpichov wrote: Hi JB, Thanks for bringing this to the mailing list. I also think that this is useful in general (and that use cases for Beam are more than just classic bigdata), and that there are interesting questions here at different levels about how to do it right. I suggest to start with the highest-level question [and discuss the particular API only after agreeing on this, possibly in a separate thread]: how to deal with the fact that Beam gives no guarantees about the environment on workers, e.g. which commands are available, which shell or even OS is being used, etc. Particularly: - Obviously different runners will have a different environment, e.g. Dataflow workers are not going to have Hadoop commands available because they are not running on a Hadoop cluster. So, pipelines and transforms developed using this connector will be necessarily non-portable between different runners. Maybe this is ok? But we need to give users a clear expectation about this. How do we phrase this expectation and where do we put it in the docs? - I'm concerned that this puts additional compatibility requirements on runners - it becomes necessary for a runner to document the environment of its workers (OS, shell, privileges, guaranteed-installed packages, access to other things on the host machine e.g. whether or not the worker runs in its own container, etc.) and to keep it stable - otherwise transforms and pipelines with this connector will be non-portable between runner versions either. Another way to deal with this is to give up and say "the environment on the workers is outside the scope of Beam; consult your runner's documentation or use your best judgment as to what the environment will be, and use this at your own risk". What do others think? On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofréwrote: Hi beamers, Today, Beam is mainly focused on data processing. Since the beginning of the project, we are discussing about extending the use cases coverage via DSLs and extensions (like for machine learning), or via IO. Especially for the IO, we can see Beam use for data integration and data ingestion. In this area, I'm proposing a first IO: ExecIO: https://issues.apache.org/jira/browse/BEAM-1059 https://github.com/apache/incubator-beam/pull/1451 Actually, this IO is mainly an ExecFn that executes system commands (again, keep in mind we are discussing about data integration/ingestion and not data processing). For convenience, this ExecFn is wrapped in Read and Write (as a regular IO). Clearly, this IO/Fn depends of the worker where it runs. But it's under the user responsibility. During the review, Eugene and I discussed about: - is it an IO or just a fn ? - is it OK to have worker specific IO ? IMHO, an IO makes lot of sense to me and it's very convenient for end users. They can do something like: PCollection output = pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh")); The pipeline will execute myscript and the output pipeline will contain command execution std out/err. On the other hand, they can do: pcollection.apply(ExecIO.write()); where PCollection contains the commands to execute. Generally speaking, end users can call ExecFn wherever they want in the pipeline steps: PCollection output = pipeline.apply(ParDo.of(new ExecIO.ExecFn())); The input collection contains the commands to execute, and the output collection contains the commands execution result std out/err. Generally speaking, I'm
Re: [DISCUSS] ExecIO
The problem with not integrating with Beam at all, is the runner doesn't know about any of these callouts. So it can't report "things aren't hung, there is a shell command running", etc. But, the integration doesn't need to be particularly deep. Imagine that the you can just pass the ProcessBuilder to the "ShellExecutor". It could also deal with managing thread pools and adjusting parallelism, making sure all the shell commands finish (even if they are async) before ending the process element, etc. On Mon, Dec 5, 2016 at 1:39 PM Eugene Kirpichovwrote: > @Kenn - Would you suggest that all runners need to support running code in > a user-specified container? > @Ben - Hmm, the features you're suggesting don't seem like they require > deep integration into Beam itself, but can be accomplished by separate > utility functions (or perhaps regular language-specific facilities like > java's ProcessBuilder). > > On Mon, Dec 5, 2016 at 1:21 PM Ben Chambers wrote: > > One option would be to use the reflective DoFn approach to this. Imagine > something like: > > public class MyExternalFn extends DoFn { > @ProcessElement > // Existence of ShellExecutor indicates the code shells out. > public void processElement(ProcessContext c, ShellExecutor shell) { > ... > Future result = shell.executeAsync("..."); > ... > c.output(result.get()); > } > } > > The API for the shell can include non-future methods, but this allows the > runners to know what commands interact with the shell, but also to report > things like (1) shell process fails (2) shell process hangs forever, better > indicate that upwards and (3) it allows the runner to manage parallelism > interacting with the shell. > > Requirements for the executor can be specified with an annotation on the > parameter or via an annotation within the DoFn. > > On Mon, Dec 5, 2016 at 1:15 PM Kenneth Knowles > wrote: > > > I would like the runner-independent, language-independent graph to have a > > way to specify requirements on the environment that a DoFn runs in. This > > would provide a natural way to talk about installed libraries, > containers, > > external services that are accessed, etc, and I think the requirement of > a > > particular OS with tools installed fits right in. At the crudest level, > > this could be limited to a container URL. > > > > Then the Java SDK needs a way to express these requirements. They will > > generally probably be properties of a DoFn instance rather than a DoFn > > class, since they may vary with instantiation parameters. > > > > On Mon, Dec 5, 2016 at 11:51 AM, Eugene Kirpichov < > > kirpic...@google.com.invalid> wrote: > > > > > Hi JB, > > > > > > Thanks for bringing this to the mailing list. I also think that this is > > > useful in general (and that use cases for Beam are more than just > classic > > > bigdata), and that there are interesting questions here at different > > levels > > > about how to do it right. > > > > > > I suggest to start with the highest-level question [and discuss the > > > particular API only after agreeing on this, possibly in a separate > > thread]: > > > how to deal with the fact that Beam gives no guarantees about the > > > environment on workers, e.g. which commands are available, which shell > or > > > even OS is being used, etc. Particularly: > > > > > > - Obviously different runners will have a different environment, e.g. > > > Dataflow workers are not going to have Hadoop commands available > because > > > they are not running on a Hadoop cluster. So, pipelines and transforms > > > developed using this connector will be necessarily non-portable between > > > different runners. Maybe this is ok? But we need to give users a clear > > > expectation about this. How do we phrase this expectation and where do > we > > > put it in the docs? > > > > > > - I'm concerned that this puts additional compatibility requirements on > > > runners - it becomes necessary for a runner to document the environment > > of > > > its workers (OS, shell, privileges, guaranteed-installed packages, > access > > > to other things on the host machine e.g. whether or not the worker runs > > in > > > its own container, etc.) and to keep it stable - otherwise transforms > and > > > pipelines with this connector will be non-portable between runner > > versions > > > either. > > > > > > Another way to deal with this is to give up and say "the environment on > > the > > > workers is outside the scope of Beam; consult your runner's > documentation > > > or use your best judgment as to what the environment will be, and use > > this > > > at your own risk". > > > > > > What do others think? > > > > > > On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré > > > wrote: > > > > > > Hi beamers, > > > > > > Today, Beam is mainly focused on data processing. > > > Since the beginning of the project, we are discussing about
Re: [DISCUSS] ExecIO
One option would be to use the reflective DoFn approach to this. Imagine something like: public class MyExternalFn extends DoFn { @ProcessElement // Existence of ShellExecutor indicates the code shells out. public void processElement(ProcessContext c, ShellExecutor shell) { ... Future result = shell.executeAsync("..."); ... c.output(result.get()); } } The API for the shell can include non-future methods, but this allows the runners to know what commands interact with the shell, but also to report things like (1) shell process fails (2) shell process hangs forever, better indicate that upwards and (3) it allows the runner to manage parallelism interacting with the shell. Requirements for the executor can be specified with an annotation on the parameter or via an annotation within the DoFn. On Mon, Dec 5, 2016 at 1:15 PM Kenneth Knowleswrote: > I would like the runner-independent, language-independent graph to have a > way to specify requirements on the environment that a DoFn runs in. This > would provide a natural way to talk about installed libraries, containers, > external services that are accessed, etc, and I think the requirement of a > particular OS with tools installed fits right in. At the crudest level, > this could be limited to a container URL. > > Then the Java SDK needs a way to express these requirements. They will > generally probably be properties of a DoFn instance rather than a DoFn > class, since they may vary with instantiation parameters. > > On Mon, Dec 5, 2016 at 11:51 AM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > Hi JB, > > > > Thanks for bringing this to the mailing list. I also think that this is > > useful in general (and that use cases for Beam are more than just classic > > bigdata), and that there are interesting questions here at different > levels > > about how to do it right. > > > > I suggest to start with the highest-level question [and discuss the > > particular API only after agreeing on this, possibly in a separate > thread]: > > how to deal with the fact that Beam gives no guarantees about the > > environment on workers, e.g. which commands are available, which shell or > > even OS is being used, etc. Particularly: > > > > - Obviously different runners will have a different environment, e.g. > > Dataflow workers are not going to have Hadoop commands available because > > they are not running on a Hadoop cluster. So, pipelines and transforms > > developed using this connector will be necessarily non-portable between > > different runners. Maybe this is ok? But we need to give users a clear > > expectation about this. How do we phrase this expectation and where do we > > put it in the docs? > > > > - I'm concerned that this puts additional compatibility requirements on > > runners - it becomes necessary for a runner to document the environment > of > > its workers (OS, shell, privileges, guaranteed-installed packages, access > > to other things on the host machine e.g. whether or not the worker runs > in > > its own container, etc.) and to keep it stable - otherwise transforms and > > pipelines with this connector will be non-portable between runner > versions > > either. > > > > Another way to deal with this is to give up and say "the environment on > the > > workers is outside the scope of Beam; consult your runner's documentation > > or use your best judgment as to what the environment will be, and use > this > > at your own risk". > > > > What do others think? > > > > On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré > > wrote: > > > > Hi beamers, > > > > Today, Beam is mainly focused on data processing. > > Since the beginning of the project, we are discussing about extending > > the use cases coverage via DSLs and extensions (like for machine > > learning), or via IO. > > > > Especially for the IO, we can see Beam use for data integration and data > > ingestion. > > > > In this area, I'm proposing a first IO: ExecIO: > > > > https://issues.apache.org/jira/browse/BEAM-1059 > > https://github.com/apache/incubator-beam/pull/1451 > > > > Actually, this IO is mainly an ExecFn that executes system commands > > (again, keep in mind we are discussing about data integration/ingestion > > and not data processing). > > > > For convenience, this ExecFn is wrapped in Read and Write (as a regular > > IO). > > > > Clearly, this IO/Fn depends of the worker where it runs. But it's under > > the user responsibility. > > > > During the review, Eugene and I discussed about: > > - is it an IO or just a fn ? > > - is it OK to have worker specific IO ? > > > > IMHO, an IO makes lot of sense to me and it's very convenient for end > > users. They can do something like: > > > > PCollection output = > > pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh")); > > > > The pipeline will execute myscript and the output pipeline will contain > > command execution std out/err. > > > >
Re: [DISCUSS] ExecIO
Hi JB, Thanks for bringing this to the mailing list. I also think that this is useful in general (and that use cases for Beam are more than just classic bigdata), and that there are interesting questions here at different levels about how to do it right. I suggest to start with the highest-level question [and discuss the particular API only after agreeing on this, possibly in a separate thread]: how to deal with the fact that Beam gives no guarantees about the environment on workers, e.g. which commands are available, which shell or even OS is being used, etc. Particularly: - Obviously different runners will have a different environment, e.g. Dataflow workers are not going to have Hadoop commands available because they are not running on a Hadoop cluster. So, pipelines and transforms developed using this connector will be necessarily non-portable between different runners. Maybe this is ok? But we need to give users a clear expectation about this. How do we phrase this expectation and where do we put it in the docs? - I'm concerned that this puts additional compatibility requirements on runners - it becomes necessary for a runner to document the environment of its workers (OS, shell, privileges, guaranteed-installed packages, access to other things on the host machine e.g. whether or not the worker runs in its own container, etc.) and to keep it stable - otherwise transforms and pipelines with this connector will be non-portable between runner versions either. Another way to deal with this is to give up and say "the environment on the workers is outside the scope of Beam; consult your runner's documentation or use your best judgment as to what the environment will be, and use this at your own risk". What do others think? On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofréwrote: Hi beamers, Today, Beam is mainly focused on data processing. Since the beginning of the project, we are discussing about extending the use cases coverage via DSLs and extensions (like for machine learning), or via IO. Especially for the IO, we can see Beam use for data integration and data ingestion. In this area, I'm proposing a first IO: ExecIO: https://issues.apache.org/jira/browse/BEAM-1059 https://github.com/apache/incubator-beam/pull/1451 Actually, this IO is mainly an ExecFn that executes system commands (again, keep in mind we are discussing about data integration/ingestion and not data processing). For convenience, this ExecFn is wrapped in Read and Write (as a regular IO). Clearly, this IO/Fn depends of the worker where it runs. But it's under the user responsibility. During the review, Eugene and I discussed about: - is it an IO or just a fn ? - is it OK to have worker specific IO ? IMHO, an IO makes lot of sense to me and it's very convenient for end users. They can do something like: PCollection output = pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh")); The pipeline will execute myscript and the output pipeline will contain command execution std out/err. On the other hand, they can do: pcollection.apply(ExecIO.write()); where PCollection contains the commands to execute. Generally speaking, end users can call ExecFn wherever they want in the pipeline steps: PCollection output = pipeline.apply(ParDo.of(new ExecIO.ExecFn())); The input collection contains the commands to execute, and the output collection contains the commands execution result std out/err. Generally speaking, I'm preparing several IOs more on the data integration/ingestion area than on "pure" classic big data processing. I think it would give a new "dimension" to Beam. Thoughts ? Regards JB -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com