Re: [DISCUSS] SEP-2: ApplicationRunner Design
Now that I look at the whole list again, I see we could adequately address #2 by first addressing #3. If instead of building aggregation directly into the window operator we had aggregation operators like sum then the lambas would be more intuitive. It would probably even be nicer if the first lambda were a value instead of a lambda or if there were such an alternative. On Tue, Jun 20, 2017 at 12:25 PM, Prateek Maheshwariwrote: > 1. +1 for not requiring explicit type information unless its unavoidable. > This one seems easy to fix, but there are other places we should address > too (OutputStream, Window operator). > > We should probably discuss the Window API separately from this discussion, > but to Chris' point: > > 2. Re: 2 lambdas, the first is an initial value Java 'Supplier', the second > is a Samza 'FoldFunction'. It might be clearer to create a new class (e.g., > 'Aggregator') with 'getInitialValue' and 'aggregate' methods that users > should implement instead of these 2 lambdas. They'll lose the ability to > provide these functions as lambdas, but I think overall it'll help > readability and understandability. We can then provide default > implementations for this class for common aggregations: accumulation (add > to collection), sum, max, min, average, percentiles etc. Is this what you > meant Chris? > > 3. I'm also strongly in favor of making the window APIs composable so that > we don't have so many variants and parameters. Otherwise, as we add more > parameters for event time support and type information for serdes, it'll > get really difficult to both discover the appropriate window variant and > understand a window specification once written. > > - Prateek > > On Tue, Jun 20, 2017 at 10:15 AM, Chris Pettitt < > cpett...@linkedin.com.invalid> wrote: > >> Feedback for PageViewCounterStreamSpecExample: >> >> https://github.com/nickpan47/samza/blob/new-api-v2/samza- >> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp >> le.java#L65: >> >> When we set up the input we had the message type, but it looks like we >> are not propagating it via StreamIO.Input thus require a cast here. >> The fix seems to be to generify StreamIO.Input. >> >> https://github.com/nickpan47/samza/blob/new-api-v2/samza- >> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp >> le.java#L67: >> >> The two lambdas here are not very intuitive. I'm assuming based on >> some previous discussion that these are setting up a fold function? I >> would suggest making this more explicit, probably with a fold function >> type. >> >> https://github.com/nickpan47/samza/blob/new-api-v2/samza- >> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp >> le.java#L94: >> >> The generic type parameters for WindowPane suggest that WindowPanes >> must be keyed. This is a reasonable assumption for >> "keyedTumblingWindow" but might not make sense for other cases, like a >> global combine operation. Beam separates the two ideas into: 1) a >> typed WindowValue, basically the primitive for values propagating >> through the graph, contains a value and a windowing information and 2) >> KV for keyed values. >> >> FWIW, the larger windowing and grouping concepts are also separated in >> Beam. It looks like this is the case with Flink as well. In examples >> from both the user specifies the windowing first and then separately >> specify the aggregation operation (e.g. group by key, combine, sum, >> etc.). This saves from the combinatorial explosion of keyed / global, >> unwindowed (batch) / session / fixed / tumbling / etc., GBK / sum / >> count / etc. >> >> - Chris >> >> On Tue, Jun 20, 2017 at 10:46 AM, Chris Pettitt >> wrote: >> > Yi, >> > >> > What examples should we be looking at for new-api-v2? >> > >> > 1. samza/samza-core/src/test/java/org/apache/samza/example/ >> PageViewCounterStreamSpecExample.java >> > >> > others? >> > >> > - Chris >> > >> > On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan wrote: >> >> Hi, all, >> >> >> >> Here is the promised code examples for the revised API, and the related >> >> change to how we specify serdes in the API: >> >> >> >> - User example for the new API chagne: >> >> https://github.com/nickpan47/samza/tree/new-api-v2 >> >> >> >> - Prateek’s PR for the proposed schema registry change: >> >> https://github.com/nickpan47/samza/pull/2/files >> >> >> >> Please feel free to comment and provide feedbacks! >> >> >> >> >> >> Thanks! >> >> >> >> >> >> -Yi >> >> >> >> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan wrote: >> >> >> >>> Hi, all, >> >>> >> >>> Thanks for all the inputs! Finally I got some time to go through the >> >>> discussion thread and digest most of the points made above. Here is my >> >>> personal summary: >> >>> >> >>> Consensus on requirements: >> >>> >> >>>1. ApplicationRunner needs async APIs. >> >>>2. ApplicationRunner can be hidden from user
Re: [DISCUSS] SEP-2: ApplicationRunner Design
1. +1 for not requiring explicit type information unless its unavoidable. This one seems easy to fix, but there are other places we should address too (OutputStream, Window operator). We should probably discuss the Window API separately from this discussion, but to Chris' point: 2. Re: 2 lambdas, the first is an initial value Java 'Supplier', the second is a Samza 'FoldFunction'. It might be clearer to create a new class (e.g., 'Aggregator') with 'getInitialValue' and 'aggregate' methods that users should implement instead of these 2 lambdas. They'll lose the ability to provide these functions as lambdas, but I think overall it'll help readability and understandability. We can then provide default implementations for this class for common aggregations: accumulation (add to collection), sum, max, min, average, percentiles etc. Is this what you meant Chris? 3. I'm also strongly in favor of making the window APIs composable so that we don't have so many variants and parameters. Otherwise, as we add more parameters for event time support and type information for serdes, it'll get really difficult to both discover the appropriate window variant and understand a window specification once written. - Prateek On Tue, Jun 20, 2017 at 10:15 AM, Chris Pettitt < cpett...@linkedin.com.invalid> wrote: > Feedback for PageViewCounterStreamSpecExample: > > https://github.com/nickpan47/samza/blob/new-api-v2/samza- > core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp > le.java#L65: > > When we set up the input we had the message type, but it looks like we > are not propagating it via StreamIO.Input thus require a cast here. > The fix seems to be to generify StreamIO.Input. > > https://github.com/nickpan47/samza/blob/new-api-v2/samza- > core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp > le.java#L67: > > The two lambdas here are not very intuitive. I'm assuming based on > some previous discussion that these are setting up a fold function? I > would suggest making this more explicit, probably with a fold function > type. > > https://github.com/nickpan47/samza/blob/new-api-v2/samza- > core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp > le.java#L94: > > The generic type parameters for WindowPane suggest that WindowPanes > must be keyed. This is a reasonable assumption for > "keyedTumblingWindow" but might not make sense for other cases, like a > global combine operation. Beam separates the two ideas into: 1) a > typed WindowValue, basically the primitive for values propagating > through the graph, contains a value and a windowing information and 2) > KV for keyed values. > > FWIW, the larger windowing and grouping concepts are also separated in > Beam. It looks like this is the case with Flink as well. In examples > from both the user specifies the windowing first and then separately > specify the aggregation operation (e.g. group by key, combine, sum, > etc.). This saves from the combinatorial explosion of keyed / global, > unwindowed (batch) / session / fixed / tumbling / etc., GBK / sum / > count / etc. > > - Chris > > On Tue, Jun 20, 2017 at 10:46 AM, Chris Pettitt> wrote: > > Yi, > > > > What examples should we be looking at for new-api-v2? > > > > 1. samza/samza-core/src/test/java/org/apache/samza/example/ > PageViewCounterStreamSpecExample.java > > > > others? > > > > - Chris > > > > On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan wrote: > >> Hi, all, > >> > >> Here is the promised code examples for the revised API, and the related > >> change to how we specify serdes in the API: > >> > >> - User example for the new API chagne: > >> https://github.com/nickpan47/samza/tree/new-api-v2 > >> > >> - Prateek’s PR for the proposed schema registry change: > >> https://github.com/nickpan47/samza/pull/2/files > >> > >> Please feel free to comment and provide feedbacks! > >> > >> > >> Thanks! > >> > >> > >> -Yi > >> > >> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan wrote: > >> > >>> Hi, all, > >>> > >>> Thanks for all the inputs! Finally I got some time to go through the > >>> discussion thread and digest most of the points made above. Here is my > >>> personal summary: > >>> > >>> Consensus on requirements: > >>> > >>>1. ApplicationRunner needs async APIs. > >>>2. ApplicationRunner can be hidden from user (except maybe in > config) > >>>3. StreamApplication is the direct wrapper for the programming > >>>interface (i.e. removing StreamGraph from the user API and allow > users to > >>>call input() and output() from the StreamApplication) in main() > >>>4. There has to be a serialization format of the StreamApplication > >>>itself, s.t. the tasks can just deserialize and create the user > logic > >>>included in StreamApplication in multiple TaskContext. > >>>5. JobRunner seems to be a very thin layer on-top-of StreamProcessor > >>>or YarnJob, and it is always a
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Feedback for PageViewCounterStreamSpecExample: https://github.com/nickpan47/samza/blob/new-api-v2/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java#L65: When we set up the input we had the message type, but it looks like we are not propagating it via StreamIO.Input thus require a cast here. The fix seems to be to generify StreamIO.Input. https://github.com/nickpan47/samza/blob/new-api-v2/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java#L67: The two lambdas here are not very intuitive. I'm assuming based on some previous discussion that these are setting up a fold function? I would suggest making this more explicit, probably with a fold function type. https://github.com/nickpan47/samza/blob/new-api-v2/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java#L94: The generic type parameters for WindowPane suggest that WindowPanes must be keyed. This is a reasonable assumption for "keyedTumblingWindow" but might not make sense for other cases, like a global combine operation. Beam separates the two ideas into: 1) a typed WindowValue, basically the primitive for values propagating through the graph, contains a value and a windowing information and 2) KV for keyed values. FWIW, the larger windowing and grouping concepts are also separated in Beam. It looks like this is the case with Flink as well. In examples from both the user specifies the windowing first and then separately specify the aggregation operation (e.g. group by key, combine, sum, etc.). This saves from the combinatorial explosion of keyed / global, unwindowed (batch) / session / fixed / tumbling / etc., GBK / sum / count / etc. - Chris On Tue, Jun 20, 2017 at 10:46 AM, Chris Pettittwrote: > Yi, > > What examples should we be looking at for new-api-v2? > > 1. > samza/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java > > others? > > - Chris > > On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan wrote: >> Hi, all, >> >> Here is the promised code examples for the revised API, and the related >> change to how we specify serdes in the API: >> >> - User example for the new API chagne: >> https://github.com/nickpan47/samza/tree/new-api-v2 >> >> - Prateek’s PR for the proposed schema registry change: >> https://github.com/nickpan47/samza/pull/2/files >> >> Please feel free to comment and provide feedbacks! >> >> >> Thanks! >> >> >> -Yi >> >> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan wrote: >> >>> Hi, all, >>> >>> Thanks for all the inputs! Finally I got some time to go through the >>> discussion thread and digest most of the points made above. Here is my >>> personal summary: >>> >>> Consensus on requirements: >>> >>>1. ApplicationRunner needs async APIs. >>>2. ApplicationRunner can be hidden from user (except maybe in config) >>>3. StreamApplication is the direct wrapper for the programming >>>interface (i.e. removing StreamGraph from the user API and allow users to >>>call input() and output() from the StreamApplication) in main() >>>4. There has to be a serialization format of the StreamApplication >>>itself, s.t. the tasks can just deserialize and create the user logic >>>included in StreamApplication in multiple TaskContext. >>>5. JobRunner seems to be a very thin layer on-top-of StreamProcessor >>>or YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a >>>RemoteJob in a RemoteApplicationRunner. There is a desire to remove it >>>6. StreamApplication needs to have some methods to allow user-injected >>>global objects for the whole application, such as JmxServer, >>>MetricsReporter, etc. >>> >>> >>> Some additional discussion points: >>> >>>1. In StreamApplication#input()/output(), what should be the input / >>>output parameter? The StreamSpec? Or the actual implementation I/O object >>>to provide messages (i.e. similar to socket reader/file reader object)? >>> In >>>the later case, we will need to define an abstract layer of StreamReader >>>and StreamWriter in the user-facing API that supports read/write of >>>partitioned streams on top of the >>> SystemConsumer/SystemProducer/SystemAdmin >>>objects. Also, the number of I/O streams via the >>> StreamReader/StreamWriter >>>can not be pre-determined (i.e. depending on input stream partitions and >>>the groupers). Hence, I am leaning toward to expose StreamSpec in the API >>>and let user builds the StreamSpec via SpecBuilder. The actual I/O >>> objects >>>will be instantiated when SystemConsumer/SystemProducer are instantiated, >>>with the number of physical partitions in each container. >>>2. There is a need to support task-level programs via the same launch >>>model as well. >>> >>> >>> Some ideas to implement the above requirements: >>> >>>1.
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Yi, What examples should we be looking at for new-api-v2? 1. samza/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java others? - Chris On Mon, Jun 19, 2017 at 5:29 PM, Yi Panwrote: > Hi, all, > > Here is the promised code examples for the revised API, and the related > change to how we specify serdes in the API: > > - User example for the new API chagne: > https://github.com/nickpan47/samza/tree/new-api-v2 > > - Prateek’s PR for the proposed schema registry change: > https://github.com/nickpan47/samza/pull/2/files > > Please feel free to comment and provide feedbacks! > > > Thanks! > > > -Yi > > On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan wrote: > >> Hi, all, >> >> Thanks for all the inputs! Finally I got some time to go through the >> discussion thread and digest most of the points made above. Here is my >> personal summary: >> >> Consensus on requirements: >> >>1. ApplicationRunner needs async APIs. >>2. ApplicationRunner can be hidden from user (except maybe in config) >>3. StreamApplication is the direct wrapper for the programming >>interface (i.e. removing StreamGraph from the user API and allow users to >>call input() and output() from the StreamApplication) in main() >>4. There has to be a serialization format of the StreamApplication >>itself, s.t. the tasks can just deserialize and create the user logic >>included in StreamApplication in multiple TaskContext. >>5. JobRunner seems to be a very thin layer on-top-of StreamProcessor >>or YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a >>RemoteJob in a RemoteApplicationRunner. There is a desire to remove it >>6. StreamApplication needs to have some methods to allow user-injected >>global objects for the whole application, such as JmxServer, >>MetricsReporter, etc. >> >> >> Some additional discussion points: >> >>1. In StreamApplication#input()/output(), what should be the input / >>output parameter? The StreamSpec? Or the actual implementation I/O object >>to provide messages (i.e. similar to socket reader/file reader object)? In >>the later case, we will need to define an abstract layer of StreamReader >>and StreamWriter in the user-facing API that supports read/write of >>partitioned streams on top of the >> SystemConsumer/SystemProducer/SystemAdmin >>objects. Also, the number of I/O streams via the StreamReader/StreamWriter >>can not be pre-determined (i.e. depending on input stream partitions and >>the groupers). Hence, I am leaning toward to expose StreamSpec in the API >>and let user builds the StreamSpec via SpecBuilder. The actual I/O objects >>will be instantiated when SystemConsumer/SystemProducer are instantiated, >>with the number of physical partitions in each container. >>2. There is a need to support task-level programs via the same launch >>model as well. >> >> >> Some ideas to implement the above requirements: >> >>1. StreamGraph#write() should be used internally to generate and >>persist the serialized format of user logic. Then, StreamGraph#read() >>should give back a deserialized version of user logic. This would implies >>that the user functions defined in APIs are mandated to be serializable. >>2. StreamApplication should include a SpecBuilder provides the >>instantiation of MessageStream/Stores, which is passed to >>StreamApplication#input() / StreamApplication#output() >>3. StreamApplication should also include an internal ApplicationRunner >>instance (config driven, class loaded) to be able to switch between local >>vs remote execution >>4. Implementation of LocalApplicationRunner should directly >>instantiate and manage StreamProcessor instances for each job, removing >> the >>LocalJobRunnner >>5. Implementation of RemoteApplicationRunner should instantiate a >>remote JobFactory, create the remote job and submitted it for each job, >>removing the current JobRunner interface >>6. We also need a StreamTaskApplication class that allows user to >>create task-level applications, by mandate the constructor with a >> parameter >>of StreamTaskFactory >> >> >> One more opinion around the status and the waitForFinish(): I would think >> that waitForFinish() is just waiting for the local Runtime to complete, not >> to wait for the remote job to be completed. >> >> I will be working on revision of SEP-2 and some example user code example >> for now and will share it soon. >> >> Thanks! >> >> -Yi >> >> On Wed, May 3, 2017 at 8:08 AM, Chris Pettitt < >> cpett...@linkedin.com.invalid> wrote: >> >>> Hi Xinyu, >>> >>> I took a second look at the registerStore API. Would it be possible to >>> call >>> register storeDirectly on the app, similar to what we're doing with >>> app.input (possible with the restriction registerStore must be called >>> before
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Hi, all, Here is the promised code examples for the revised API, and the related change to how we specify serdes in the API: - User example for the new API chagne: https://github.com/nickpan47/samza/tree/new-api-v2 - Prateek’s PR for the proposed schema registry change: https://github.com/nickpan47/samza/pull/2/files Please feel free to comment and provide feedbacks! Thanks! -Yi On Tue, Jun 6, 2017 at 11:16 AM, Yi Panwrote: > Hi, all, > > Thanks for all the inputs! Finally I got some time to go through the > discussion thread and digest most of the points made above. Here is my > personal summary: > > Consensus on requirements: > >1. ApplicationRunner needs async APIs. >2. ApplicationRunner can be hidden from user (except maybe in config) >3. StreamApplication is the direct wrapper for the programming >interface (i.e. removing StreamGraph from the user API and allow users to >call input() and output() from the StreamApplication) in main() >4. There has to be a serialization format of the StreamApplication >itself, s.t. the tasks can just deserialize and create the user logic >included in StreamApplication in multiple TaskContext. >5. JobRunner seems to be a very thin layer on-top-of StreamProcessor >or YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a >RemoteJob in a RemoteApplicationRunner. There is a desire to remove it >6. StreamApplication needs to have some methods to allow user-injected >global objects for the whole application, such as JmxServer, >MetricsReporter, etc. > > > Some additional discussion points: > >1. In StreamApplication#input()/output(), what should be the input / >output parameter? The StreamSpec? Or the actual implementation I/O object >to provide messages (i.e. similar to socket reader/file reader object)? In >the later case, we will need to define an abstract layer of StreamReader >and StreamWriter in the user-facing API that supports read/write of >partitioned streams on top of the SystemConsumer/SystemProducer/SystemAdmin >objects. Also, the number of I/O streams via the StreamReader/StreamWriter >can not be pre-determined (i.e. depending on input stream partitions and >the groupers). Hence, I am leaning toward to expose StreamSpec in the API >and let user builds the StreamSpec via SpecBuilder. The actual I/O objects >will be instantiated when SystemConsumer/SystemProducer are instantiated, >with the number of physical partitions in each container. >2. There is a need to support task-level programs via the same launch >model as well. > > > Some ideas to implement the above requirements: > >1. StreamGraph#write() should be used internally to generate and >persist the serialized format of user logic. Then, StreamGraph#read() >should give back a deserialized version of user logic. This would implies >that the user functions defined in APIs are mandated to be serializable. >2. StreamApplication should include a SpecBuilder provides the >instantiation of MessageStream/Stores, which is passed to >StreamApplication#input() / StreamApplication#output() >3. StreamApplication should also include an internal ApplicationRunner >instance (config driven, class loaded) to be able to switch between local >vs remote execution >4. Implementation of LocalApplicationRunner should directly >instantiate and manage StreamProcessor instances for each job, removing the >LocalJobRunnner >5. Implementation of RemoteApplicationRunner should instantiate a >remote JobFactory, create the remote job and submitted it for each job, >removing the current JobRunner interface >6. We also need a StreamTaskApplication class that allows user to >create task-level applications, by mandate the constructor with a parameter >of StreamTaskFactory > > > One more opinion around the status and the waitForFinish(): I would think > that waitForFinish() is just waiting for the local Runtime to complete, not > to wait for the remote job to be completed. > > I will be working on revision of SEP-2 and some example user code example > for now and will share it soon. > > Thanks! > > -Yi > > On Wed, May 3, 2017 at 8:08 AM, Chris Pettitt < > cpett...@linkedin.com.invalid> wrote: > >> Hi Xinyu, >> >> I took a second look at the registerStore API. Would it be possible to >> call >> register storeDirectly on the app, similar to what we're doing with >> app.input (possible with the restriction registerStore must be called >> before we add an operator that uses the store)? Otherwise we'll end up >> having to do two passes on the graph again - similar to the way we had to >> do a pass to init stream config and then hook up the graph. >> >> Thanks, >> Chris >> >> >> On Fri, Apr 28, 2017 at 8:55 PM, xinyu liu wrote: >> >> > Right, option #2 seems redundant for defining streams
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Hi, all, Thanks for all the inputs! Finally I got some time to go through the discussion thread and digest most of the points made above. Here is my personal summary: Consensus on requirements: 1. ApplicationRunner needs async APIs. 2. ApplicationRunner can be hidden from user (except maybe in config) 3. StreamApplication is the direct wrapper for the programming interface (i.e. removing StreamGraph from the user API and allow users to call input() and output() from the StreamApplication) in main() 4. There has to be a serialization format of the StreamApplication itself, s.t. the tasks can just deserialize and create the user logic included in StreamApplication in multiple TaskContext. 5. JobRunner seems to be a very thin layer on-top-of StreamProcessor or YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a RemoteJob in a RemoteApplicationRunner. There is a desire to remove it 6. StreamApplication needs to have some methods to allow user-injected global objects for the whole application, such as JmxServer, MetricsReporter, etc. Some additional discussion points: 1. In StreamApplication#input()/output(), what should be the input / output parameter? The StreamSpec? Or the actual implementation I/O object to provide messages (i.e. similar to socket reader/file reader object)? In the later case, we will need to define an abstract layer of StreamReader and StreamWriter in the user-facing API that supports read/write of partitioned streams on top of the SystemConsumer/SystemProducer/SystemAdmin objects. Also, the number of I/O streams via the StreamReader/StreamWriter can not be pre-determined (i.e. depending on input stream partitions and the groupers). Hence, I am leaning toward to expose StreamSpec in the API and let user builds the StreamSpec via SpecBuilder. The actual I/O objects will be instantiated when SystemConsumer/SystemProducer are instantiated, with the number of physical partitions in each container. 2. There is a need to support task-level programs via the same launch model as well. Some ideas to implement the above requirements: 1. StreamGraph#write() should be used internally to generate and persist the serialized format of user logic. Then, StreamGraph#read() should give back a deserialized version of user logic. This would implies that the user functions defined in APIs are mandated to be serializable. 2. StreamApplication should include a SpecBuilder provides the instantiation of MessageStream/Stores, which is passed to StreamApplication#input() / StreamApplication#output() 3. StreamApplication should also include an internal ApplicationRunner instance (config driven, class loaded) to be able to switch between local vs remote execution 4. Implementation of LocalApplicationRunner should directly instantiate and manage StreamProcessor instances for each job, removing the LocalJobRunnner 5. Implementation of RemoteApplicationRunner should instantiate a remote JobFactory, create the remote job and submitted it for each job, removing the current JobRunner interface 6. We also need a StreamTaskApplication class that allows user to create task-level applications, by mandate the constructor with a parameter of StreamTaskFactory One more opinion around the status and the waitForFinish(): I would think that waitForFinish() is just waiting for the local Runtime to complete, not to wait for the remote job to be completed. I will be working on revision of SEP-2 and some example user code example for now and will share it soon. Thanks! -Yi On Wed, May 3, 2017 at 8:08 AM, Chris Pettittwrote: > Hi Xinyu, > > I took a second look at the registerStore API. Would it be possible to call > register storeDirectly on the app, similar to what we're doing with > app.input (possible with the restriction registerStore must be called > before we add an operator that uses the store)? Otherwise we'll end up > having to do two passes on the graph again - similar to the way we had to > do a pass to init stream config and then hook up the graph. > > Thanks, > Chris > > > On Fri, Apr 28, 2017 at 8:55 PM, xinyu liu wrote: > > > Right, option #2 seems redundant for defining streams after further > > discussion here. StreamSpec itself is flexible enough to achieve both > > static and programmatic specification of the stream. Agree it's not > > convenient for now (pretty obvious after looking at your bsr > > beam.runners.samza.wrapper), and we should provide similar predefined > > convenient wrappers for user to create the StreamSpec. In your case > > something like BoundedStreamSpec.file() which will generate the > system > > and serialize the data as you did. > > > > We're still thinking the callback proposed in #2 can be useful for > > requirement #6: injecting other user objects in run
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Hi Xinyu, I took a second look at the registerStore API. Would it be possible to call register storeDirectly on the app, similar to what we're doing with app.input (possible with the restriction registerStore must be called before we add an operator that uses the store)? Otherwise we'll end up having to do two passes on the graph again - similar to the way we had to do a pass to init stream config and then hook up the graph. Thanks, Chris On Fri, Apr 28, 2017 at 8:55 PM, xinyu liuwrote: > Right, option #2 seems redundant for defining streams after further > discussion here. StreamSpec itself is flexible enough to achieve both > static and programmatic specification of the stream. Agree it's not > convenient for now (pretty obvious after looking at your bsr > beam.runners.samza.wrapper), and we should provide similar predefined > convenient wrappers for user to create the StreamSpec. In your case > something like BoundedStreamSpec.file() which will generate the system > and serialize the data as you did. > > We're still thinking the callback proposed in #2 can be useful for > requirement #6: injecting other user objects in run time, such as stores > and metrics. To simplify the user understanding further, I think we might > hide the ApplicationRunner and expose the StreamApplication instead, which > will make requirement #3 not user facing. So the API becomes like: > > StreamApplication app = StreamApplication.local(config) > .init (env -> { >env.registerStore("my-store", new MyStoreFactory()); >env.registerMetricsReporter("my-reporte", new > MyMetricsReporterFactory()); > }) > .withLifeCycleListener(myListener); > > app.input(BoundedStreamSpec.create("/sample/input.txt")) > .map(...) > .window(...) > > app.run(); > > For requirement #5, I add a .withLifeCycleListener() in the API, which can > trigger the callbacks with life cycle events. > > For #4: distribution of the jars will be what we have today using the Yarn > localization with a remote store like artifactory or http server. We > discussed where to put the graph serialization. The current thinking is to > define a general interface which can backed by a remote store, like Kafka, > artifactory or http server. For Kafka, it's straightforward but we will > have the size limit or cut it by ourselves. For the other two, we need to > investigate whether we can easily upload jars to our artifactory and > localizing it with Yarn. Any opinions on this? > > Thanks, > Xinyu > > On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt < > cpett...@linkedin.com.invalid> wrote: > > > Your proposal for #1 looks good. > > > > I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add > > the stream spec straight onto the runner while in #2 you do it in a > > callback. If it is either-or, #1 looks a lot better for my purposes. > > > > For #4 what mechanism are you using to distribute the JARs? Can you use > the > > same mechanism to distribute the serialized graph? > > > > On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu > wrote: > > > > > btw, I will get to SAMZA-1246 as soon as possible. > > > > > > Thanks, > > > Xinyu > > > > > > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu > > wrote: > > > > > > > Let me try to capture the updated requirements: > > > > > > > > 1. Set up input streams outside StreamGraph, and treat graph building > > as > > > a > > > > library (*Fluent, Beam*). > > > > > > > > 2. Improve ease of use for ApplicationRunner to avoid complex > > > > configurations such as zkCoordinator, zkCoordinationService. > > > (*Standalone*). > > > > Provide some programmatic way to tweak them in the API. > > > > > > > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We > > can > > > > have one or more implementations but it's hidden from the users. > > > > > > > > 4. Separate StreamGraph from runtime environment so it can be > > serialized > > > (*Beam, > > > > Yarn*) > > > > > > > > 5. Better life cycle management of application, parity with > > > > StreamProcessor (*Standalone, Beam*). Stats should include exception > in > > > > case of failure (tracked in SAMZA-1246). > > > > > > > > 6. Support injecting user-defined objects into ApplicationRunner. > > > > > > > > Prateek and I iterate on the ApplilcationRunner API based on these > > > > requirements. To support #1, we can set up input streams on the > runner > > > > level, which returns the MessageStream and allows graph building > > > > afterwards. The code looks like below: > > > > > > > > ApplicationRunner runner = ApplicationRunner.local(); > > > > runner.input(streamSpec) > > > > .map(..) > > > > .window(...) > > > > runner.run(); > > > > > > > > StreamSpec is the building block for setting up streams here. It can > be > > > > set up in different ways: > > > > > > > > - Direct creation of stream spec, like runner.input(new
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Looked again at Chris's beam-samza-runner implementation. Seems LocalApplicationRunner.run() should be asynchronous too. Current implementation is actually using a latch to wait for the StreamProcessors to finish, which seems unnecessary. And we can provide a waitUntilFinish() counterpart to the user. I created https://issues.apache.org/jira/browse/SAMZA-1252 to track it. Thanks, Xinyu On Fri, Apr 28, 2017 at 5:55 PM, xinyu liuwrote: > Right, option #2 seems redundant for defining streams after further > discussion here. StreamSpec itself is flexible enough to achieve both > static and programmatic specification of the stream. Agree it's not > convenient for now (pretty obvious after looking at your bsr > beam.runners.samza.wrapper), and we should provide similar predefined > convenient wrappers for user to create the StreamSpec. In your case > something like BoundedStreamSpec.file() which will generate the system > and serialize the data as you did. > > We're still thinking the callback proposed in #2 can be useful for > requirement #6: injecting other user objects in run time, such as stores > and metrics. To simplify the user understanding further, I think we might > hide the ApplicationRunner and expose the StreamApplication instead, which > will make requirement #3 not user facing. So the API becomes like: > > StreamApplication app = StreamApplication.local(config) > .init (env -> { >env.registerStore("my-store", new MyStoreFactory()); >env.registerMetricsReporter("my-reporte", new > MyMetricsReporterFactory()); > }) > .withLifeCycleListener(myListener); > > app.input(BoundedStreamSpec.create("/sample/input.txt")) > .map(...) > .window(...) > > app.run(); > > For requirement #5, I add a .withLifeCycleListener() in the API, which can > trigger the callbacks with life cycle events. > > For #4: distribution of the jars will be what we have today using the Yarn > localization with a remote store like artifactory or http server. We > discussed where to put the graph serialization. The current thinking is to > define a general interface which can backed by a remote store, like Kafka, > artifactory or http server. For Kafka, it's straightforward but we will > have the size limit or cut it by ourselves. For the other two, we need to > investigate whether we can easily upload jars to our artifactory and > localizing it with Yarn. Any opinions on this? > > Thanks, > Xinyu > > On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt < > cpett...@linkedin.com.invalid> wrote: > >> Your proposal for #1 looks good. >> >> I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add >> the stream spec straight onto the runner while in #2 you do it in a >> callback. If it is either-or, #1 looks a lot better for my purposes. >> >> For #4 what mechanism are you using to distribute the JARs? Can you use >> the >> same mechanism to distribute the serialized graph? >> >> On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu >> wrote: >> >> > btw, I will get to SAMZA-1246 as soon as possible. >> > >> > Thanks, >> > Xinyu >> > >> > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu >> wrote: >> > >> > > Let me try to capture the updated requirements: >> > > >> > > 1. Set up input streams outside StreamGraph, and treat graph building >> as >> > a >> > > library (*Fluent, Beam*). >> > > >> > > 2. Improve ease of use for ApplicationRunner to avoid complex >> > > configurations such as zkCoordinator, zkCoordinationService. >> > (*Standalone*). >> > > Provide some programmatic way to tweak them in the API. >> > > >> > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We >> can >> > > have one or more implementations but it's hidden from the users. >> > > >> > > 4. Separate StreamGraph from runtime environment so it can be >> serialized >> > (*Beam, >> > > Yarn*) >> > > >> > > 5. Better life cycle management of application, parity with >> > > StreamProcessor (*Standalone, Beam*). Stats should include exception >> in >> > > case of failure (tracked in SAMZA-1246). >> > > >> > > 6. Support injecting user-defined objects into ApplicationRunner. >> > > >> > > Prateek and I iterate on the ApplilcationRunner API based on these >> > > requirements. To support #1, we can set up input streams on the runner >> > > level, which returns the MessageStream and allows graph building >> > > afterwards. The code looks like below: >> > > >> > > ApplicationRunner runner = ApplicationRunner.local(); >> > > runner.input(streamSpec) >> > > .map(..) >> > > .window(...) >> > > runner.run(); >> > > >> > > StreamSpec is the building block for setting up streams here. It can >> be >> > > set up in different ways: >> > > >> > > - Direct creation of stream spec, like runner.input(new >> StreamSpec(id, >> > > system, stream)) >> > > - Load from streamId from env or config, like >> >
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Right, option #2 seems redundant for defining streams after further discussion here. StreamSpec itself is flexible enough to achieve both static and programmatic specification of the stream. Agree it's not convenient for now (pretty obvious after looking at your bsr beam.runners.samza.wrapper), and we should provide similar predefined convenient wrappers for user to create the StreamSpec. In your case something like BoundedStreamSpec.file() which will generate the system and serialize the data as you did. We're still thinking the callback proposed in #2 can be useful for requirement #6: injecting other user objects in run time, such as stores and metrics. To simplify the user understanding further, I think we might hide the ApplicationRunner and expose the StreamApplication instead, which will make requirement #3 not user facing. So the API becomes like: StreamApplication app = StreamApplication.local(config) .init (env -> { env.registerStore("my-store", new MyStoreFactory()); env.registerMetricsReporter("my-reporte", new MyMetricsReporterFactory()); }) .withLifeCycleListener(myListener); app.input(BoundedStreamSpec.create("/sample/input.txt")) .map(...) .window(...) app.run(); For requirement #5, I add a .withLifeCycleListener() in the API, which can trigger the callbacks with life cycle events. For #4: distribution of the jars will be what we have today using the Yarn localization with a remote store like artifactory or http server. We discussed where to put the graph serialization. The current thinking is to define a general interface which can backed by a remote store, like Kafka, artifactory or http server. For Kafka, it's straightforward but we will have the size limit or cut it by ourselves. For the other two, we need to investigate whether we can easily upload jars to our artifactory and localizing it with Yarn. Any opinions on this? Thanks, Xinyu On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt < cpett...@linkedin.com.invalid> wrote: > Your proposal for #1 looks good. > > I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add > the stream spec straight onto the runner while in #2 you do it in a > callback. If it is either-or, #1 looks a lot better for my purposes. > > For #4 what mechanism are you using to distribute the JARs? Can you use the > same mechanism to distribute the serialized graph? > > On Fri, Apr 28, 2017 at 12:14 AM, xinyu liuwrote: > > > btw, I will get to SAMZA-1246 as soon as possible. > > > > Thanks, > > Xinyu > > > > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu > wrote: > > > > > Let me try to capture the updated requirements: > > > > > > 1. Set up input streams outside StreamGraph, and treat graph building > as > > a > > > library (*Fluent, Beam*). > > > > > > 2. Improve ease of use for ApplicationRunner to avoid complex > > > configurations such as zkCoordinator, zkCoordinationService. > > (*Standalone*). > > > Provide some programmatic way to tweak them in the API. > > > > > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We > can > > > have one or more implementations but it's hidden from the users. > > > > > > 4. Separate StreamGraph from runtime environment so it can be > serialized > > (*Beam, > > > Yarn*) > > > > > > 5. Better life cycle management of application, parity with > > > StreamProcessor (*Standalone, Beam*). Stats should include exception in > > > case of failure (tracked in SAMZA-1246). > > > > > > 6. Support injecting user-defined objects into ApplicationRunner. > > > > > > Prateek and I iterate on the ApplilcationRunner API based on these > > > requirements. To support #1, we can set up input streams on the runner > > > level, which returns the MessageStream and allows graph building > > > afterwards. The code looks like below: > > > > > > ApplicationRunner runner = ApplicationRunner.local(); > > > runner.input(streamSpec) > > > .map(..) > > > .window(...) > > > runner.run(); > > > > > > StreamSpec is the building block for setting up streams here. It can be > > > set up in different ways: > > > > > > - Direct creation of stream spec, like runner.input(new > StreamSpec(id, > > > system, stream)) > > > - Load from streamId from env or config, like > > runner.input(runner.env(). > > > getStreamSpec(id)) > > > - Canned Spec which generates the StreamSpec with id, system and > stream > > > to minimize the configuration. For example, CollectionSpec.create(new > > > ArrayList[]{1,2,3,4}), which will auto generate the system and stream > in > > > the spec. > > > > > > To support #2, we need to be able to set up StreamSpec-related objects > > and > > > factories programmatically in env. Suppose we have the following before > > > runner.input(...): > > > > > > runner.setup(env /* a writable interface of env*/ -> { > > > env.setStreamSpec(streamId, streamSpec); > > >
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Your proposal for #1 looks good. I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add the stream spec straight onto the runner while in #2 you do it in a callback. If it is either-or, #1 looks a lot better for my purposes. For #4 what mechanism are you using to distribute the JARs? Can you use the same mechanism to distribute the serialized graph? On Fri, Apr 28, 2017 at 12:14 AM, xinyu liuwrote: > btw, I will get to SAMZA-1246 as soon as possible. > > Thanks, > Xinyu > > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu wrote: > > > Let me try to capture the updated requirements: > > > > 1. Set up input streams outside StreamGraph, and treat graph building as > a > > library (*Fluent, Beam*). > > > > 2. Improve ease of use for ApplicationRunner to avoid complex > > configurations such as zkCoordinator, zkCoordinationService. > (*Standalone*). > > Provide some programmatic way to tweak them in the API. > > > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can > > have one or more implementations but it's hidden from the users. > > > > 4. Separate StreamGraph from runtime environment so it can be serialized > (*Beam, > > Yarn*) > > > > 5. Better life cycle management of application, parity with > > StreamProcessor (*Standalone, Beam*). Stats should include exception in > > case of failure (tracked in SAMZA-1246). > > > > 6. Support injecting user-defined objects into ApplicationRunner. > > > > Prateek and I iterate on the ApplilcationRunner API based on these > > requirements. To support #1, we can set up input streams on the runner > > level, which returns the MessageStream and allows graph building > > afterwards. The code looks like below: > > > > ApplicationRunner runner = ApplicationRunner.local(); > > runner.input(streamSpec) > > .map(..) > > .window(...) > > runner.run(); > > > > StreamSpec is the building block for setting up streams here. It can be > > set up in different ways: > > > > - Direct creation of stream spec, like runner.input(new StreamSpec(id, > > system, stream)) > > - Load from streamId from env or config, like > runner.input(runner.env(). > > getStreamSpec(id)) > > - Canned Spec which generates the StreamSpec with id, system and stream > > to minimize the configuration. For example, CollectionSpec.create(new > > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in > > the spec. > > > > To support #2, we need to be able to set up StreamSpec-related objects > and > > factories programmatically in env. Suppose we have the following before > > runner.input(...): > > > > runner.setup(env /* a writable interface of env*/ -> { > > env.setStreamSpec(streamId, streamSpec); > > env.setSystem(systemName, systemFactory); > > }) > > > > runner.setup(->) also provides setup for stores and other runtime stuff > > needed for the execution. The setup should be able to serialized to > config. > > For #6, I haven't figured out a good way to inject user-defined objects > > here yet. > > > > With this API, we should be able to also support #4. For remote > > runner.run(), the operator user classes/lamdas in the StreamGraph need to > > be serialized. As today, the existing option is to serialize to a stream, > > either the coordinator stream or the pipeline control stream, which will > > have the size limit per message. Do you see RPC as an option? > > > > For this version of API, seems we don't need the StreamApplication > wrapper > > as well as exposing the StreamGraph. Do you think we are on the right > path? > > > > Thanks, > > Xinyu > > > > > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt < > > cpett...@linkedin.com.invalid> wrote: > > > >> That should have been: > >> > >> For #1, Beam doesn't have a hard requirement... > >> > >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt > >> wrote: > >> > >> > For #1, I doesn't have a hard requirement for any change from Samza. A > >> > very nice to have would be to allow the input systems to be set up at > >> the > >> > same time as the rest of the StreamGraph. An even nicer to have would > >> be to > >> > do away with the callback based approach and treat graph building as a > >> > library, a la Beam and Flink. > >> > > >> > For the moment I've worked around the two pass requirement (once for > >> > config, once for StreamGraph) by introducing an IR layer between Beam > >> and > >> > the Samza Fluent translation. The IR layer is convenient independent > of > >> > this problem because it makes it easier to switch between the Fluent > and > >> > low-level APIs. > >> > > >> > > >> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in > >> > great shape. One additional issue with the status call that I may not > >> have > >> > mentioned is that it provides you no way to get at the cause of > failure. > >> > The StreamProcessor API does allow this via the
Re: [DISCUSS] SEP-2: ApplicationRunner Design
btw, I will get to SAMZA-1246 as soon as possible. Thanks, Xinyu On Thu, Apr 27, 2017 at 9:11 PM, xinyu liuwrote: > Let me try to capture the updated requirements: > > 1. Set up input streams outside StreamGraph, and treat graph building as a > library (*Fluent, Beam*). > > 2. Improve ease of use for ApplicationRunner to avoid complex > configurations such as zkCoordinator, zkCoordinationService. (*Standalone*). > Provide some programmatic way to tweak them in the API. > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can > have one or more implementations but it's hidden from the users. > > 4. Separate StreamGraph from runtime environment so it can be serialized > (*Beam, > Yarn*) > > 5. Better life cycle management of application, parity with > StreamProcessor (*Standalone, Beam*). Stats should include exception in > case of failure (tracked in SAMZA-1246). > > 6. Support injecting user-defined objects into ApplicationRunner. > > Prateek and I iterate on the ApplilcationRunner API based on these > requirements. To support #1, we can set up input streams on the runner > level, which returns the MessageStream and allows graph building > afterwards. The code looks like below: > > ApplicationRunner runner = ApplicationRunner.local(); > runner.input(streamSpec) > .map(..) > .window(...) > runner.run(); > > StreamSpec is the building block for setting up streams here. It can be > set up in different ways: > > - Direct creation of stream spec, like runner.input(new StreamSpec(id, > system, stream)) > - Load from streamId from env or config, like runner.input(runner.env(). > getStreamSpec(id)) > - Canned Spec which generates the StreamSpec with id, system and stream > to minimize the configuration. For example, CollectionSpec.create(new > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in > the spec. > > To support #2, we need to be able to set up StreamSpec-related objects and > factories programmatically in env. Suppose we have the following before > runner.input(...): > > runner.setup(env /* a writable interface of env*/ -> { > env.setStreamSpec(streamId, streamSpec); > env.setSystem(systemName, systemFactory); > }) > > runner.setup(->) also provides setup for stores and other runtime stuff > needed for the execution. The setup should be able to serialized to config. > For #6, I haven't figured out a good way to inject user-defined objects > here yet. > > With this API, we should be able to also support #4. For remote > runner.run(), the operator user classes/lamdas in the StreamGraph need to > be serialized. As today, the existing option is to serialize to a stream, > either the coordinator stream or the pipeline control stream, which will > have the size limit per message. Do you see RPC as an option? > > For this version of API, seems we don't need the StreamApplication wrapper > as well as exposing the StreamGraph. Do you think we are on the right path? > > Thanks, > Xinyu > > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt < > cpett...@linkedin.com.invalid> wrote: > >> That should have been: >> >> For #1, Beam doesn't have a hard requirement... >> >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt >> wrote: >> >> > For #1, I doesn't have a hard requirement for any change from Samza. A >> > very nice to have would be to allow the input systems to be set up at >> the >> > same time as the rest of the StreamGraph. An even nicer to have would >> be to >> > do away with the callback based approach and treat graph building as a >> > library, a la Beam and Flink. >> > >> > For the moment I've worked around the two pass requirement (once for >> > config, once for StreamGraph) by introducing an IR layer between Beam >> and >> > the Samza Fluent translation. The IR layer is convenient independent of >> > this problem because it makes it easier to switch between the Fluent and >> > low-level APIs. >> > >> > >> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in >> > great shape. One additional issue with the status call that I may not >> have >> > mentioned is that it provides you no way to get at the cause of failure. >> > The StreamProcessor API does allow this via the callback. >> > >> > >> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration >> > indirection you currently have to jump through (this is also related to >> > system consumer configuration from #1. It makes it much easier to >> discover >> > what the configurable parameters are too, if we provide some >> programmatic >> > way to tweak them in the API - which can turn into config under the >> hood. >> > >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu >> wrote: >> > >> >> Let me give a shot to summarize the requirements for ApplicationRunner >> we >> >> have discussed so far: >> >> >> >> - Support environment for passing in user-defined
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Let me try to capture the updated requirements: 1. Set up input streams outside StreamGraph, and treat graph building as a library (*Fluent, Beam*). 2. Improve ease of use for ApplicationRunner to avoid complex configurations such as zkCoordinator, zkCoordinationService. (*Standalone*). Provide some programmatic way to tweak them in the API. 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can have one or more implementations but it's hidden from the users. 4. Separate StreamGraph from runtime environment so it can be serialized (*Beam, Yarn*) 5. Better life cycle management of application, parity with StreamProcessor (*Standalone, Beam*). Stats should include exception in case of failure (tracked in SAMZA-1246). 6. Support injecting user-defined objects into ApplicationRunner. Prateek and I iterate on the ApplilcationRunner API based on these requirements. To support #1, we can set up input streams on the runner level, which returns the MessageStream and allows graph building afterwards. The code looks like below: ApplicationRunner runner = ApplicationRunner.local(); runner.input(streamSpec) .map(..) .window(...) runner.run(); StreamSpec is the building block for setting up streams here. It can be set up in different ways: - Direct creation of stream spec, like runner.input(new StreamSpec(id, system, stream)) - Load from streamId from env or config, like runner.input(runner.env().getStreamSpec(id)) - Canned Spec which generates the StreamSpec with id, system and stream to minimize the configuration. For example, CollectionSpec.create(new ArrayList[]{1,2,3,4}), which will auto generate the system and stream in the spec. To support #2, we need to be able to set up StreamSpec-related objects and factories programmatically in env. Suppose we have the following before runner.input(...): runner.setup(env /* a writable interface of env*/ -> { env.setStreamSpec(streamId, streamSpec); env.setSystem(systemName, systemFactory); }) runner.setup(->) also provides setup for stores and other runtime stuff needed for the execution. The setup should be able to serialized to config. For #6, I haven't figured out a good way to inject user-defined objects here yet. With this API, we should be able to also support #4. For remote runner.run(), the operator user classes/lamdas in the StreamGraph need to be serialized. As today, the existing option is to serialize to a stream, either the coordinator stream or the pipeline control stream, which will have the size limit per message. Do you see RPC as an option? For this version of API, seems we don't need the StreamApplication wrapper as well as exposing the StreamGraph. Do you think we are on the right path? Thanks, Xinyu On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt < cpett...@linkedin.com.invalid> wrote: > That should have been: > > For #1, Beam doesn't have a hard requirement... > > On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt> wrote: > > > For #1, I doesn't have a hard requirement for any change from Samza. A > > very nice to have would be to allow the input systems to be set up at the > > same time as the rest of the StreamGraph. An even nicer to have would be > to > > do away with the callback based approach and treat graph building as a > > library, a la Beam and Flink. > > > > For the moment I've worked around the two pass requirement (once for > > config, once for StreamGraph) by introducing an IR layer between Beam and > > the Samza Fluent translation. The IR layer is convenient independent of > > this problem because it makes it easier to switch between the Fluent and > > low-level APIs. > > > > > > For #4, if we had parity with StreamProcessor for lifecycle we'd be in > > great shape. One additional issue with the status call that I may not > have > > mentioned is that it provides you no way to get at the cause of failure. > > The StreamProcessor API does allow this via the callback. > > > > > > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration > > indirection you currently have to jump through (this is also related to > > system consumer configuration from #1. It makes it much easier to > discover > > what the configurable parameters are too, if we provide some programmatic > > way to tweak them in the API - which can turn into config under the hood. > > > > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu > wrote: > > > >> Let me give a shot to summarize the requirements for ApplicationRunner > we > >> have discussed so far: > >> > >> - Support environment for passing in user-defined objects (streams > >> potentially) into ApplicationRunner (*Beam*) > >> > >> - Improve ease of use for ApplicationRunner to avoid complex > >> configurations > >> such as zkCoordinator, zkCoordinationService. (*Standalone*) > >> > >> - Clean up ApplicationRunner into a single interface (*Fluent*). We can > >> have one or more
Re: [DISCUSS] SEP-2: ApplicationRunner Design
That should have been: For #1, Beam doesn't have a hard requirement... On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettittwrote: > For #1, I doesn't have a hard requirement for any change from Samza. A > very nice to have would be to allow the input systems to be set up at the > same time as the rest of the StreamGraph. An even nicer to have would be to > do away with the callback based approach and treat graph building as a > library, a la Beam and Flink. > > For the moment I've worked around the two pass requirement (once for > config, once for StreamGraph) by introducing an IR layer between Beam and > the Samza Fluent translation. The IR layer is convenient independent of > this problem because it makes it easier to switch between the Fluent and > low-level APIs. > > > For #4, if we had parity with StreamProcessor for lifecycle we'd be in > great shape. One additional issue with the status call that I may not have > mentioned is that it provides you no way to get at the cause of failure. > The StreamProcessor API does allow this via the callback. > > > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration > indirection you currently have to jump through (this is also related to > system consumer configuration from #1. It makes it much easier to discover > what the configurable parameters are too, if we provide some programmatic > way to tweak them in the API - which can turn into config under the hood. > > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu wrote: > >> Let me give a shot to summarize the requirements for ApplicationRunner we >> have discussed so far: >> >> - Support environment for passing in user-defined objects (streams >> potentially) into ApplicationRunner (*Beam*) >> >> - Improve ease of use for ApplicationRunner to avoid complex >> configurations >> such as zkCoordinator, zkCoordinationService. (*Standalone*) >> >> - Clean up ApplicationRunner into a single interface (*Fluent*). We can >> have one or more implementations but it's hidden from the users. >> >> - Separate StreamGraph from environment so it can be serializable (*Beam, >> Yarn*) >> >> - Better life cycle management of application, including >> start/stop/stats (*Standalone, >> Beam*) >> >> >> One way to address 2 and 3 is to provide pre-packaged runner using static >> factory methods, and the return type will be the ApplicationRunner >> interface. So we can have: >> >> ApplicationRunner runner = ApplicationRunner.zk() / >> ApplicationRunner.local() >> / ApplicationRunner.remote() / ApplicationRunner.test(). >> >> Internally we will package the right configs and run-time environment with >> the runner. For example, ApplicationRunner.zk() will define all the >> configs >> needed for zk coordination. >> >> To support 1 and 4, can we pass in a lambda function in the runner, and >> then we can run the stream graph? Like the following: >> >> ApplicationRunner.zk().env(config -> environment).run(streamGraph); >> >> Then we need a way to pass the environment into the StreamGraph. This can >> be done by either adding an extra parameter to each operator, or have a >> getEnv() function in the MessageStream, which seems to be pretty hacky. >> >> What do you think? >> >> Thanks, >> Xinyu >> >> >> >> >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari < >> pmaheshw...@linkedin.com.invalid> wrote: >> >> > Thanks for putting this together Yi! >> > >> > I agree with Jake, it does seem like there are a few too many moving >> parts >> > here. That said, the problem being solved is pretty broad, so let me >> try to >> > summarize my current understanding of the requirements. Please correct >> me >> > if I'm wrong or missing something. >> > >> > ApplicationRunner and JobRunner first, ignoring test environment for the >> > moment. >> > ApplicationRunner: >> > 1. Create execution plan: Same in Standalone and Yarn >> > 2. Create intermediate streams: Same logic but different leader election >> > (ZK-based or pre-configured in standalone, AM in Yarn). >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn. >> > >> > JobRunner: >> > 1. Run the StreamProcessors: Same process in Standalone & Test. Remote >> host >> > in Yarn. >> > >> > To get a single ApplicationRunner implementation, like Jake suggested, >> we >> > need to make leader election and JobRunner implementation pluggable. >> > There's still the question of whether ApplicationRunner#run API should >> be >> > blocking or non-blocking. It has to be non-blocking in YARN. We want it >> to >> > be blocking in standalone, but seems like the main reason is ease of use >> > when launched from main(). I'd prefer making it consitently non-blocking >> > instead, esp. since in embedded standalone mode (where the processor is >> > running in another container) a blocking API would not be user-friendly >> > either. If not, we can add both run and runBlocking. >> > >> > Coming to RuntimeEnvironment, which is the least
Re: [DISCUSS] SEP-2: ApplicationRunner Design
For #1, I doesn't have a hard requirement for any change from Samza. A very nice to have would be to allow the input systems to be set up at the same time as the rest of the StreamGraph. An even nicer to have would be to do away with the callback based approach and treat graph building as a library, a la Beam and Flink. For the moment I've worked around the two pass requirement (once for config, once for StreamGraph) by introducing an IR layer between Beam and the Samza Fluent translation. The IR layer is convenient independent of this problem because it makes it easier to switch between the Fluent and low-level APIs. For #4, if we had parity with StreamProcessor for lifecycle we'd be in great shape. One additional issue with the status call that I may not have mentioned is that it provides you no way to get at the cause of failure. The StreamProcessor API does allow this via the callback. Re. #2 and #3, I'm a big fan of getting rid of the extra configuration indirection you currently have to jump through (this is also related to system consumer configuration from #1. It makes it much easier to discover what the configurable parameters are too, if we provide some programmatic way to tweak them in the API - which can turn into config under the hood. On Wed, Apr 26, 2017 at 9:20 PM, xinyu liuwrote: > Let me give a shot to summarize the requirements for ApplicationRunner we > have discussed so far: > > - Support environment for passing in user-defined objects (streams > potentially) into ApplicationRunner (*Beam*) > > - Improve ease of use for ApplicationRunner to avoid complex configurations > such as zkCoordinator, zkCoordinationService. (*Standalone*) > > - Clean up ApplicationRunner into a single interface (*Fluent*). We can > have one or more implementations but it's hidden from the users. > > - Separate StreamGraph from environment so it can be serializable (*Beam, > Yarn*) > > - Better life cycle management of application, including > start/stop/stats (*Standalone, > Beam*) > > > One way to address 2 and 3 is to provide pre-packaged runner using static > factory methods, and the return type will be the ApplicationRunner > interface. So we can have: > > ApplicationRunner runner = ApplicationRunner.zk() / > ApplicationRunner.local() > / ApplicationRunner.remote() / ApplicationRunner.test(). > > Internally we will package the right configs and run-time environment with > the runner. For example, ApplicationRunner.zk() will define all the configs > needed for zk coordination. > > To support 1 and 4, can we pass in a lambda function in the runner, and > then we can run the stream graph? Like the following: > > ApplicationRunner.zk().env(config -> environment).run(streamGraph); > > Then we need a way to pass the environment into the StreamGraph. This can > be done by either adding an extra parameter to each operator, or have a > getEnv() function in the MessageStream, which seems to be pretty hacky. > > What do you think? > > Thanks, > Xinyu > > > > > > On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari < > pmaheshw...@linkedin.com.invalid> wrote: > > > Thanks for putting this together Yi! > > > > I agree with Jake, it does seem like there are a few too many moving > parts > > here. That said, the problem being solved is pretty broad, so let me try > to > > summarize my current understanding of the requirements. Please correct me > > if I'm wrong or missing something. > > > > ApplicationRunner and JobRunner first, ignoring test environment for the > > moment. > > ApplicationRunner: > > 1. Create execution plan: Same in Standalone and Yarn > > 2. Create intermediate streams: Same logic but different leader election > > (ZK-based or pre-configured in standalone, AM in Yarn). > > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn. > > > > JobRunner: > > 1. Run the StreamProcessors: Same process in Standalone & Test. Remote > host > > in Yarn. > > > > To get a single ApplicationRunner implementation, like Jake suggested, we > > need to make leader election and JobRunner implementation pluggable. > > There's still the question of whether ApplicationRunner#run API should be > > blocking or non-blocking. It has to be non-blocking in YARN. We want it > to > > be blocking in standalone, but seems like the main reason is ease of use > > when launched from main(). I'd prefer making it consitently non-blocking > > instead, esp. since in embedded standalone mode (where the processor is > > running in another container) a blocking API would not be user-friendly > > either. If not, we can add both run and runBlocking. > > > > Coming to RuntimeEnvironment, which is the least clear to me so far: > > 1. I don't think RuntimeEnvironment should be responsible for providing > > StreamSpecs for streamIds - they can be obtained with a config/util > class. > > The StreamProcessor should only know about logical streamIds and the > > streamId <-> actual stream mapping should happen
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Let me give a shot to summarize the requirements for ApplicationRunner we have discussed so far: - Support environment for passing in user-defined objects (streams potentially) into ApplicationRunner (*Beam*) - Improve ease of use for ApplicationRunner to avoid complex configurations such as zkCoordinator, zkCoordinationService. (*Standalone*) - Clean up ApplicationRunner into a single interface (*Fluent*). We can have one or more implementations but it's hidden from the users. - Separate StreamGraph from environment so it can be serializable (*Beam, Yarn*) - Better life cycle management of application, including start/stop/stats (*Standalone, Beam*) One way to address 2 and 3 is to provide pre-packaged runner using static factory methods, and the return type will be the ApplicationRunner interface. So we can have: ApplicationRunner runner = ApplicationRunner.zk() / ApplicationRunner.local() / ApplicationRunner.remote() / ApplicationRunner.test(). Internally we will package the right configs and run-time environment with the runner. For example, ApplicationRunner.zk() will define all the configs needed for zk coordination. To support 1 and 4, can we pass in a lambda function in the runner, and then we can run the stream graph? Like the following: ApplicationRunner.zk().env(config -> environment).run(streamGraph); Then we need a way to pass the environment into the StreamGraph. This can be done by either adding an extra parameter to each operator, or have a getEnv() function in the MessageStream, which seems to be pretty hacky. What do you think? Thanks, Xinyu On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari < pmaheshw...@linkedin.com.invalid> wrote: > Thanks for putting this together Yi! > > I agree with Jake, it does seem like there are a few too many moving parts > here. That said, the problem being solved is pretty broad, so let me try to > summarize my current understanding of the requirements. Please correct me > if I'm wrong or missing something. > > ApplicationRunner and JobRunner first, ignoring test environment for the > moment. > ApplicationRunner: > 1. Create execution plan: Same in Standalone and Yarn > 2. Create intermediate streams: Same logic but different leader election > (ZK-based or pre-configured in standalone, AM in Yarn). > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn. > > JobRunner: > 1. Run the StreamProcessors: Same process in Standalone & Test. Remote host > in Yarn. > > To get a single ApplicationRunner implementation, like Jake suggested, we > need to make leader election and JobRunner implementation pluggable. > There's still the question of whether ApplicationRunner#run API should be > blocking or non-blocking. It has to be non-blocking in YARN. We want it to > be blocking in standalone, but seems like the main reason is ease of use > when launched from main(). I'd prefer making it consitently non-blocking > instead, esp. since in embedded standalone mode (where the processor is > running in another container) a blocking API would not be user-friendly > either. If not, we can add both run and runBlocking. > > Coming to RuntimeEnvironment, which is the least clear to me so far: > 1. I don't think RuntimeEnvironment should be responsible for providing > StreamSpecs for streamIds - they can be obtained with a config/util class. > The StreamProcessor should only know about logical streamIds and the > streamId <-> actual stream mapping should happen within the > SystemProducer/Consumer/Admins provided by the RuntimeEnvironment. > 2. There's also other components that the user might be interested in > providing implementations of in embedded Standalone mode (i.e., not just in > tests) - MetricsRegistry and JMXServer come to mind. > 3. Most importantly, it's not clear to me who creates and manages the > RuntimeEnvironment. It seems like it should be the ApplicationRunner or the > user because of (2) above and because StreamManager also needs access to > SystemAdmins for creating intermediate streams which users might want to > mock. But it also needs to be passed down to the StreamProcessor - how > would this work on Yarn? > > I think we should figure out how to integrate RuntimeEnvironment with > ApplicationRunner before we can make a call on one vs. multiple > ApplicationRunner implementations. If we do keep LocalApplicationRunner and > RemoteApplication (and TestApplicationRunner) separate, agree with Jake > that we should remove the JobRunners and roll them up into the respective > ApplicationRunners. > > - Prateek > > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maeswrote: > > > Thanks for the SEP! > > > > +1 on introducing these new components > > -1 on the current definition of their roles (see Design feedback below) > > > > *Design* > > > >- If LocalJobRunner and RemoteJobRunner handle the different methods > of > >launching a Job, what additional value do the different types of > >
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Thanks for putting this together Yi! I agree with Jake, it does seem like there are a few too many moving parts here. That said, the problem being solved is pretty broad, so let me try to summarize my current understanding of the requirements. Please correct me if I'm wrong or missing something. ApplicationRunner and JobRunner first, ignoring test environment for the moment. ApplicationRunner: 1. Create execution plan: Same in Standalone and Yarn 2. Create intermediate streams: Same logic but different leader election (ZK-based or pre-configured in standalone, AM in Yarn). 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn. JobRunner: 1. Run the StreamProcessors: Same process in Standalone & Test. Remote host in Yarn. To get a single ApplicationRunner implementation, like Jake suggested, we need to make leader election and JobRunner implementation pluggable. There's still the question of whether ApplicationRunner#run API should be blocking or non-blocking. It has to be non-blocking in YARN. We want it to be blocking in standalone, but seems like the main reason is ease of use when launched from main(). I'd prefer making it consitently non-blocking instead, esp. since in embedded standalone mode (where the processor is running in another container) a blocking API would not be user-friendly either. If not, we can add both run and runBlocking. Coming to RuntimeEnvironment, which is the least clear to me so far: 1. I don't think RuntimeEnvironment should be responsible for providing StreamSpecs for streamIds - they can be obtained with a config/util class. The StreamProcessor should only know about logical streamIds and the streamId <-> actual stream mapping should happen within the SystemProducer/Consumer/Admins provided by the RuntimeEnvironment. 2. There's also other components that the user might be interested in providing implementations of in embedded Standalone mode (i.e., not just in tests) - MetricsRegistry and JMXServer come to mind. 3. Most importantly, it's not clear to me who creates and manages the RuntimeEnvironment. It seems like it should be the ApplicationRunner or the user because of (2) above and because StreamManager also needs access to SystemAdmins for creating intermediate streams which users might want to mock. But it also needs to be passed down to the StreamProcessor - how would this work on Yarn? I think we should figure out how to integrate RuntimeEnvironment with ApplicationRunner before we can make a call on one vs. multiple ApplicationRunner implementations. If we do keep LocalApplicationRunner and RemoteApplication (and TestApplicationRunner) separate, agree with Jake that we should remove the JobRunners and roll them up into the respective ApplicationRunners. - Prateek On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maeswrote: > Thanks for the SEP! > > +1 on introducing these new components > -1 on the current definition of their roles (see Design feedback below) > > *Design* > >- If LocalJobRunner and RemoteJobRunner handle the different methods of >launching a Job, what additional value do the different types of >ApplicationRunner and RuntimeEnvironment provide? It seems like a red > flag >that all 3 would need to change from environment to environment. It >indicates that they don't have proper modularity. The > call-sequence-figures >support this; LocalApplicationRunner and RemoteApplicationRunner make > the >same calls and the diagram only varies after jobRunner.start() >- As far as I can tell, the only difference between Local and Remote >ApplicationRunner is that one is blocking and the other is > non-blocking. If >that's all they're for then either the names should be changed to > reflect >this, or they should be combined into one ApplicationRunner and just > expose >separate methods for run() and runBlocking() >- There isn't much detail on why the main() methods for Local/Remote >have such different implementations, how they receive the Application >(direct vs config), and concretely how the deployment scripts, if any, >should interact with them. > > > *Style* > >- nit: None of the 11 uses of the word "actual" in the doc are > *actually* >needed. :-) >- nit: Colors of the runtime blocks in the diagrams are unconventional >and a little distracting. Reminds me of nai won bao. Now I'm hungry. :-) >- Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The >term "execution environment" is used >- The code comparisons for the ApplicationRunners are not apples-apples. >The local runner example is an application that USES the local runner. > The >remote runner example is the just the runner code itself. So, it's not >readily apparent that we're comparing the main() methods and not the >application itself. > > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan wrote: > > > Made some updates to clarify the
Re: [DISCUSS] SEP-2: ApplicationRunner Design
I'm playing with ApplicationRunner, so I'll probably have more feedback. For now, in addition to async run we also need async notification of completion or failure. Also, ApplicationStatus should be able to give me the cause of failure (e.g. via an Exception), not just a failure state. On Thu, Apr 20, 2017 at 3:52 PM, Chris Pettittwrote: > It might be worth taking a look at how Beam does test streams. The API is > more powerful than just passing in a queue, e.g.: > > TestStream source = TestStream.create(StringUtf8Coder.of()) > .addElements(TimestampedValue.of("this", start)) > .addElements(TimestampedValue.of("that", start)) > .addElements(TimestampedValue.of("future", > start.plus(Duration.standardMinutes(1 > .advanceProcessingTime(Duration.standardMinutes(3)) > .advanceWatermarkTo(start.plus(Duration.standardSeconds(30))) > .advanceWatermarkTo(start.plus(Duration.standardMinutes(1))) > .advanceWatermarkToInfinity(); > > --- > > BTW, have we given up on the idea of a simpler input system, e.g. one that > assumes all input messages are keyed? It seems it would be possible to > support legacy "system streams" via an adapter that mapped K, V -> V' and > could open the possibility of inputs in whatever for users want, e.g. > (again from Beam): > > final Create.Values values = Create.of("test", "one", "two", "three"); > > final TextIO.Read.Bound from = > TextIO.Read.from("src/main/resources/words.txt"); > > final KafkaIO.Read reader = KafkaIO. read() > > .withBootstrapServers("myServer1:9092,myServer2:9092") > > .withTopics(topics) > > .withConsumerFactoryFn(new ConsumerFactoryFn( > > topics, 10, numElements, OffsetResetStrategy.EARLIEST)) > > .withKeyCoder(BigEndianIntegerCoder.of()) > > .withValueCoder(BigEndianLongCoder.of()) > > .withMaxNumRecords(numElements); > Ideally, such a simple input system specification would be useable in > production as well as test. At that point I don't know if we need a separate > TestApplicationRunner except perhaps as a hint to what we've been calling an > Environment? > > --- > > Aren't we supposed to be able to run applications without blocking (e.g. > for embedded cases)? The API suggests that run is going to be a blocking > call? > > - Chris > > > On Thu, Apr 20, 2017 at 1:06 PM, Jacob Maes wrote: > >> Thanks for the SEP! >> >> +1 on introducing these new components >> -1 on the current definition of their roles (see Design feedback below) >> >> *Design* >> >>- If LocalJobRunner and RemoteJobRunner handle the different methods of >>launching a Job, what additional value do the different types of >>ApplicationRunner and RuntimeEnvironment provide? It seems like a red >> flag >>that all 3 would need to change from environment to environment. It >>indicates that they don't have proper modularity. The >> call-sequence-figures >>support this; LocalApplicationRunner and RemoteApplicationRunner make >> the >>same calls and the diagram only varies after jobRunner.start() >>- As far as I can tell, the only difference between Local and Remote >>ApplicationRunner is that one is blocking and the other is >> non-blocking. If >>that's all they're for then either the names should be changed to >> reflect >>this, or they should be combined into one ApplicationRunner and just >> expose >>separate methods for run() and runBlocking() >>- There isn't much detail on why the main() methods for Local/Remote >>have such different implementations, how they receive the Application >>(direct vs config), and concretely how the deployment scripts, if any, >>should interact with them. >> >> >> *Style* >> >>- nit: None of the 11 uses of the word "actual" in the doc are >> *actually* >>needed. :-) >>- nit: Colors of the runtime blocks in the diagrams are unconventional >>and a little distracting. Reminds me of nai won bao. Now I'm hungry. >> :-) >>- Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The >>term "execution environment" is used >>- The code comparisons for the ApplicationRunners are not >> apples-apples. >>The local runner example is an application that USES the local runner. >> The >>remote runner example is the just the runner code itself. So, it's not >>readily apparent that we're comparing the main() methods and not the >>application itself. >> >> >> On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan wrote: >> >> > Made some updates to clarify the role and functions of >> RuntimeEnvironment >> > in SEP-2. >> > >> > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan wrote: >> > >> > > Hi, everyone, >> > > >> > > In light of new features such as fluent API and standalone that >> introduce >> > > new deployment / application launch models in Samza, I created a new >> >
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Hey Yi, Thanks for lot for your work on this document. I know it must have been crazy trying to put-together everything in a single doc :) Here are my comments. Sorry about the delay :( 1. It will be useful to set some background for the benefit of the community members who haven't been following design docs in the JIRAs. Can you briefly explain the definition of StreamApplication and how it translates to jobs through the stack. 2. "Problem" section doesn't seem to describe any problem that ApplicationRunner is solving :) Imo, ApplicationRunner basically provides a unified programming pattern for the user to execute StreamApplications defined using fluent-api or task-level API. I think the problem and motivation section can use a little bit of re-wording. 3. In the "Overview of ApplicationRunner" section: * How the components within ApplicationRunner interact isn't very obvious from the overview image. For example, ExecutionPlanner translates a "StreamApplication" into an "ExecutionPlan" which is essentially a specification of the DAG. (Please correct me, if I am wrong here!). The ExecutionPlan is used by the JobRunner to launch Samza jobs. * The roles of ExecutionPlanner and JobRunner are fairly well-defined. StreamManager seems like a util class that helps class-load systems and create streams. The ExecutionPlan will be consumed by JobRunner and JobRunner will use StreamManager to create intermediate streams, prior to launching jobs. It doesn't sound like a StreamManager is a "component" of the ApplicationRunner. * What is the role of the RuntimeEnvironment? That has not been explained. Maybe explaining that will fill the gap in understanding for the readers. I see that you have tried to explain the flow of control in the code using the sequence diagram. Perhaps, if we can articulate the roles/responsibilities of the RuntimeEnvironment, there will not be a need for the control flow diagram. 4. How is runtime environment defined by the user? Is it configurable ? Answering these questions in the doc will be useful 5. In the "Interaction between RuntimeEnvironment and ApplicationRunners" section: * Samza container is interacting with the RuntimeEnvironment. Does that make the RuntimeEnvironment as a shared component between the LocalApplicationRunner and the SamzaContainer? It doesn't seem to be the case for RemoteApplicationRunner. So, I am confused as to why it is different. 6. In general, what does "app.class" config represent? It seems straightforward when a "StreamApplication" is defined. Is it applicable when using low-level task api? 7. Interface defintions: * Perhaps when you implement this, can you specifically callout if each method is blocking or not in the javadoc ? 8. Minor nit-picking: * "ApplicationRunners in Different Execution Environments" -> should it be RuntimeEnvironments as that is the terminology used in the rest of the document. * In the "How this works in standalone deployment" section: * "Deploy the application to standalone hosts" and *Run run-local-app.sh on each node to start/stop the local application* are probably just a single step - Deploy the application to standalone hosts using run-local-app.sh?? General question: It seems like, even with extensive changes to the interfaces/programming model, we are still class loading the components for most parts. In such a world, we are not close to integrating with frameworks that already have a lifecycle model and can provide instantiated objects directly. For example, in the Samza as a library use-case, it makes sense for the user to provide a JmxServer or a taskFactory or a custom metricReporter for the StreamProcessor. One of the motivations for this case was that most applications are already running within a servlet/jetty container model with its own lifecycle. If ApplicationRunner(s) is the unified interface, doesn't that prohibit Samza from being integrated with such frameworks? Thanks! Navina On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maeswrote: > Thanks for the SEP! > > +1 on introducing these new components > -1 on the current definition of their roles (see Design feedback below) > > *Design* > >- If LocalJobRunner and RemoteJobRunner handle the different methods of >launching a Job, what additional value do the different types of >ApplicationRunner and RuntimeEnvironment provide? It seems like a red > flag >that all 3 would need to change from environment to environment. It >indicates that they don't have proper modularity. The > call-sequence-figures >support this; LocalApplicationRunner and RemoteApplicationRunner make > the >same calls and the diagram only varies after jobRunner.start() >- As far as I can tell, the only difference between Local and Remote >ApplicationRunner is that one is blocking and the other is > non-blocking. If >that's all they're for then either the names should be changed to > reflect >this, or they should be
Re: [DISCUSS] SEP-2: ApplicationRunner Design
It might be worth taking a look at how Beam does test streams. The API is more powerful than just passing in a queue, e.g.: TestStream source = TestStream.create(StringUtf8Coder.of()) .addElements(TimestampedValue.of("this", start)) .addElements(TimestampedValue.of("that", start)) .addElements(TimestampedValue.of("future", start.plus(Duration.standardMinutes(1 .advanceProcessingTime(Duration.standardMinutes(3)) .advanceWatermarkTo(start.plus(Duration.standardSeconds(30))) .advanceWatermarkTo(start.plus(Duration.standardMinutes(1))) .advanceWatermarkToInfinity(); --- BTW, have we given up on the idea of a simpler input system, e.g. one that assumes all input messages are keyed? It seems it would be possible to support legacy "system streams" via an adapter that mapped K, V -> V' and could open the possibility of inputs in whatever for users want, e.g. (again from Beam): final Create.Values values = Create.of("test", "one", "two", "three"); final TextIO.Read.Bound from = TextIO.Read.from("src/main/resources/words.txt"); final KafkaIO.Readreader = KafkaIO. read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.EARLIEST)) .withKeyCoder(BigEndianIntegerCoder.of()) .withValueCoder(BigEndianLongCoder.of()) .withMaxNumRecords(numElements); Ideally, such a simple input system specification would be useable in production as well as test. At that point I don't know if we need a separate TestApplicationRunner except perhaps as a hint to what we've been calling an Environment? --- Aren't we supposed to be able to run applications without blocking (e.g. for embedded cases)? The API suggests that run is going to be a blocking call? - Chris On Thu, Apr 20, 2017 at 1:06 PM, Jacob Maes wrote: > Thanks for the SEP! > > +1 on introducing these new components > -1 on the current definition of their roles (see Design feedback below) > > *Design* > >- If LocalJobRunner and RemoteJobRunner handle the different methods of >launching a Job, what additional value do the different types of >ApplicationRunner and RuntimeEnvironment provide? It seems like a red > flag >that all 3 would need to change from environment to environment. It >indicates that they don't have proper modularity. The > call-sequence-figures >support this; LocalApplicationRunner and RemoteApplicationRunner make > the >same calls and the diagram only varies after jobRunner.start() >- As far as I can tell, the only difference between Local and Remote >ApplicationRunner is that one is blocking and the other is > non-blocking. If >that's all they're for then either the names should be changed to > reflect >this, or they should be combined into one ApplicationRunner and just > expose >separate methods for run() and runBlocking() >- There isn't much detail on why the main() methods for Local/Remote >have such different implementations, how they receive the Application >(direct vs config), and concretely how the deployment scripts, if any, >should interact with them. > > > *Style* > >- nit: None of the 11 uses of the word "actual" in the doc are > *actually* >needed. :-) >- nit: Colors of the runtime blocks in the diagrams are unconventional >and a little distracting. Reminds me of nai won bao. Now I'm hungry. :-) >- Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The >term "execution environment" is used >- The code comparisons for the ApplicationRunners are not apples-apples. >The local runner example is an application that USES the local runner. > The >remote runner example is the just the runner code itself. So, it's not >readily apparent that we're comparing the main() methods and not the >application itself. > > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan wrote: > > > Made some updates to clarify the role and functions of RuntimeEnvironment > > in SEP-2. > > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan wrote: > > > > > Hi, everyone, > > > > > > In light of new features such as fluent API and standalone that > introduce > > > new deployment / application launch models in Samza, I created a new > > SEP-2 > > > to address the new use cases. SEP-2 link: https://cwiki.apache. > > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design > > > > > > Please take a look and give feedbacks! > > > > > > Thanks! > > > > > > -Yi > > > > > >
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Thanks for the SEP! +1 on introducing these new components -1 on the current definition of their roles (see Design feedback below) *Design* - If LocalJobRunner and RemoteJobRunner handle the different methods of launching a Job, what additional value do the different types of ApplicationRunner and RuntimeEnvironment provide? It seems like a red flag that all 3 would need to change from environment to environment. It indicates that they don't have proper modularity. The call-sequence-figures support this; LocalApplicationRunner and RemoteApplicationRunner make the same calls and the diagram only varies after jobRunner.start() - As far as I can tell, the only difference between Local and Remote ApplicationRunner is that one is blocking and the other is non-blocking. If that's all they're for then either the names should be changed to reflect this, or they should be combined into one ApplicationRunner and just expose separate methods for run() and runBlocking() - There isn't much detail on why the main() methods for Local/Remote have such different implementations, how they receive the Application (direct vs config), and concretely how the deployment scripts, if any, should interact with them. *Style* - nit: None of the 11 uses of the word "actual" in the doc are *actually* needed. :-) - nit: Colors of the runtime blocks in the diagrams are unconventional and a little distracting. Reminds me of nai won bao. Now I'm hungry. :-) - Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The term "execution environment" is used - The code comparisons for the ApplicationRunners are not apples-apples. The local runner example is an application that USES the local runner. The remote runner example is the just the runner code itself. So, it's not readily apparent that we're comparing the main() methods and not the application itself. On Mon, Apr 17, 2017 at 5:02 PM, Yi Panwrote: > Made some updates to clarify the role and functions of RuntimeEnvironment > in SEP-2. > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan wrote: > > > Hi, everyone, > > > > In light of new features such as fluent API and standalone that introduce > > new deployment / application launch models in Samza, I created a new > SEP-2 > > to address the new use cases. SEP-2 link: https://cwiki.apache. > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design > > > > Please take a look and give feedbacks! > > > > Thanks! > > > > -Yi > > >
Re: [DISCUSS] SEP-2: ApplicationRunner Design
Made some updates to clarify the role and functions of RuntimeEnvironment in SEP-2. On Fri, Apr 14, 2017 at 9:30 AM, Yi Panwrote: > Hi, everyone, > > In light of new features such as fluent API and standalone that introduce > new deployment / application launch models in Samza, I created a new SEP-2 > to address the new use cases. SEP-2 link: https://cwiki.apache. > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design > > Please take a look and give feedbacks! > > Thanks! > > -Yi >
[DISCUSS] SEP-2: ApplicationRunner Design
Hi, everyone, In light of new features such as fluent API and standalone that introduce new deployment / application launch models in Samza, I created a new SEP-2 to address the new use cases. SEP-2 link: https://cwiki.apache.org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design Please take a look and give feedbacks! Thanks! -Yi