Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-21 Thread Chris Pettitt
Now that I look at the whole list again, I see we could adequately
address #2 by first addressing #3. If instead of building aggregation
directly into the window operator we had aggregation operators like
sum then the lambas would be more intuitive. It would probably even be
nicer if the first lambda were a value instead of a lambda or if there
were such an alternative.

On Tue, Jun 20, 2017 at 12:25 PM, Prateek Maheshwari
 wrote:
> 1. +1 for not requiring explicit type information unless its unavoidable.
> This one seems easy to fix, but there are other places we should address
> too (OutputStream, Window operator).
>
> We should probably discuss the Window API separately from this discussion,
> but to Chris' point:
>
> 2. Re: 2 lambdas, the first is an initial value Java 'Supplier', the second
> is a Samza 'FoldFunction'. It might be clearer to create a new class (e.g.,
> 'Aggregator') with 'getInitialValue' and 'aggregate' methods that users
> should implement instead of these 2 lambdas. They'll lose the ability to
> provide these functions as lambdas, but I think overall it'll help
> readability and understandability. We can then provide default
> implementations for this class for common aggregations: accumulation (add
> to collection), sum, max, min, average, percentiles etc. Is this what you
> meant Chris?
>
> 3. I'm also strongly in favor of making the window APIs composable so that
> we don't have so many variants and parameters. Otherwise, as we add more
> parameters for event time support and type information for serdes, it'll
> get really difficult to both discover the appropriate window variant and
> understand a window specification once written.
>
> - Prateek
>
> On Tue, Jun 20, 2017 at 10:15 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
>> Feedback for PageViewCounterStreamSpecExample:
>>
>> https://github.com/nickpan47/samza/blob/new-api-v2/samza-
>> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp
>> le.java#L65:
>>
>> When we set up the input we had the message type, but it looks like we
>> are not propagating it via StreamIO.Input thus require a cast here.
>> The fix seems to be to generify StreamIO.Input.
>>
>> https://github.com/nickpan47/samza/blob/new-api-v2/samza-
>> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp
>> le.java#L67:
>>
>> The two lambdas here are not very intuitive. I'm assuming based on
>> some previous discussion that these are setting up a fold function? I
>> would suggest making this more explicit, probably with a fold function
>> type.
>>
>> https://github.com/nickpan47/samza/blob/new-api-v2/samza-
>> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp
>> le.java#L94:
>>
>> The generic type parameters for WindowPane suggest that WindowPanes
>> must be keyed. This is a reasonable assumption for
>> "keyedTumblingWindow" but might not make sense for other cases, like a
>> global combine operation. Beam separates the two ideas into: 1) a
>> typed WindowValue, basically the primitive for values propagating
>> through the graph, contains a value and a windowing information and 2)
>> KV for keyed values.
>>
>> FWIW, the larger windowing and grouping concepts are also separated in
>> Beam. It looks like this is the case with Flink as well. In examples
>> from both the user specifies the windowing first and then separately
>> specify the aggregation operation (e.g. group by key, combine, sum,
>> etc.). This saves from the combinatorial explosion of keyed / global,
>> unwindowed (batch) / session / fixed / tumbling / etc., GBK / sum /
>> count / etc.
>>
>> - Chris
>>
>> On Tue, Jun 20, 2017 at 10:46 AM, Chris Pettitt 
>> wrote:
>> > Yi,
>> >
>> > What examples should we be looking at for new-api-v2?
>> >
>> > 1. samza/samza-core/src/test/java/org/apache/samza/example/
>> PageViewCounterStreamSpecExample.java
>> >
>> > others?
>> >
>> > - Chris
>> >
>> > On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan  wrote:
>> >> Hi, all,
>> >>
>> >> Here is the promised code examples for the revised API, and the related
>> >> change to how we specify serdes in the API:
>> >>
>> >> - User example for the new API chagne:
>> >> https://github.com/nickpan47/samza/tree/new-api-v2
>> >>
>> >> - Prateek’s PR for the proposed schema registry change:
>> >> https://github.com/nickpan47/samza/pull/2/files
>> >>
>> >> Please feel free to comment and provide feedbacks!
>> >>
>> >>
>> >> Thanks!
>> >>
>> >>
>> >> -Yi
>> >>
>> >> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan  wrote:
>> >>
>> >>> Hi, all,
>> >>>
>> >>> Thanks for all the inputs! Finally I got some time to go through the
>> >>> discussion thread and digest most of the points made above. Here is my
>> >>> personal summary:
>> >>>
>> >>> Consensus on requirements:
>> >>>
>> >>>1. ApplicationRunner needs async APIs.
>> >>>2. ApplicationRunner can be hidden from user 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-20 Thread Prateek Maheshwari
1. +1 for not requiring explicit type information unless its unavoidable.
This one seems easy to fix, but there are other places we should address
too (OutputStream, Window operator).

We should probably discuss the Window API separately from this discussion,
but to Chris' point:

2. Re: 2 lambdas, the first is an initial value Java 'Supplier', the second
is a Samza 'FoldFunction'. It might be clearer to create a new class (e.g.,
'Aggregator') with 'getInitialValue' and 'aggregate' methods that users
should implement instead of these 2 lambdas. They'll lose the ability to
provide these functions as lambdas, but I think overall it'll help
readability and understandability. We can then provide default
implementations for this class for common aggregations: accumulation (add
to collection), sum, max, min, average, percentiles etc. Is this what you
meant Chris?

3. I'm also strongly in favor of making the window APIs composable so that
we don't have so many variants and parameters. Otherwise, as we add more
parameters for event time support and type information for serdes, it'll
get really difficult to both discover the appropriate window variant and
understand a window specification once written.

- Prateek

On Tue, Jun 20, 2017 at 10:15 AM, Chris Pettitt <
cpett...@linkedin.com.invalid> wrote:

> Feedback for PageViewCounterStreamSpecExample:
>
> https://github.com/nickpan47/samza/blob/new-api-v2/samza-
> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp
> le.java#L65:
>
> When we set up the input we had the message type, but it looks like we
> are not propagating it via StreamIO.Input thus require a cast here.
> The fix seems to be to generify StreamIO.Input.
>
> https://github.com/nickpan47/samza/blob/new-api-v2/samza-
> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp
> le.java#L67:
>
> The two lambdas here are not very intuitive. I'm assuming based on
> some previous discussion that these are setting up a fold function? I
> would suggest making this more explicit, probably with a fold function
> type.
>
> https://github.com/nickpan47/samza/blob/new-api-v2/samza-
> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp
> le.java#L94:
>
> The generic type parameters for WindowPane suggest that WindowPanes
> must be keyed. This is a reasonable assumption for
> "keyedTumblingWindow" but might not make sense for other cases, like a
> global combine operation. Beam separates the two ideas into: 1) a
> typed WindowValue, basically the primitive for values propagating
> through the graph, contains a value and a windowing information and 2)
> KV for keyed values.
>
> FWIW, the larger windowing and grouping concepts are also separated in
> Beam. It looks like this is the case with Flink as well. In examples
> from both the user specifies the windowing first and then separately
> specify the aggregation operation (e.g. group by key, combine, sum,
> etc.). This saves from the combinatorial explosion of keyed / global,
> unwindowed (batch) / session / fixed / tumbling / etc., GBK / sum /
> count / etc.
>
> - Chris
>
> On Tue, Jun 20, 2017 at 10:46 AM, Chris Pettitt 
> wrote:
> > Yi,
> >
> > What examples should we be looking at for new-api-v2?
> >
> > 1. samza/samza-core/src/test/java/org/apache/samza/example/
> PageViewCounterStreamSpecExample.java
> >
> > others?
> >
> > - Chris
> >
> > On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan  wrote:
> >> Hi, all,
> >>
> >> Here is the promised code examples for the revised API, and the related
> >> change to how we specify serdes in the API:
> >>
> >> - User example for the new API chagne:
> >> https://github.com/nickpan47/samza/tree/new-api-v2
> >>
> >> - Prateek’s PR for the proposed schema registry change:
> >> https://github.com/nickpan47/samza/pull/2/files
> >>
> >> Please feel free to comment and provide feedbacks!
> >>
> >>
> >> Thanks!
> >>
> >>
> >> -Yi
> >>
> >> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan  wrote:
> >>
> >>> Hi, all,
> >>>
> >>> Thanks for all the inputs! Finally I got some time to go through the
> >>> discussion thread and digest most of the points made above. Here is my
> >>> personal summary:
> >>>
> >>> Consensus on requirements:
> >>>
> >>>1. ApplicationRunner needs async APIs.
> >>>2. ApplicationRunner can be hidden from user (except maybe in
> config)
> >>>3. StreamApplication is the direct wrapper for the programming
> >>>interface (i.e. removing StreamGraph from the user API and allow
> users to
> >>>call input() and output() from the StreamApplication) in main()
> >>>4. There has to be a serialization format of the StreamApplication
> >>>itself, s.t. the tasks can just deserialize and create the user
> logic
> >>>included in StreamApplication in multiple TaskContext.
> >>>5. JobRunner seems to be a very thin layer on-top-of StreamProcessor
> >>>or YarnJob, and it is always a 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-20 Thread Chris Pettitt
Feedback for PageViewCounterStreamSpecExample:

https://github.com/nickpan47/samza/blob/new-api-v2/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java#L65:

When we set up the input we had the message type, but it looks like we
are not propagating it via StreamIO.Input thus require a cast here.
The fix seems to be to generify StreamIO.Input.

https://github.com/nickpan47/samza/blob/new-api-v2/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java#L67:

The two lambdas here are not very intuitive. I'm assuming based on
some previous discussion that these are setting up a fold function? I
would suggest making this more explicit, probably with a fold function
type.

https://github.com/nickpan47/samza/blob/new-api-v2/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java#L94:

The generic type parameters for WindowPane suggest that WindowPanes
must be keyed. This is a reasonable assumption for
"keyedTumblingWindow" but might not make sense for other cases, like a
global combine operation. Beam separates the two ideas into: 1) a
typed WindowValue, basically the primitive for values propagating
through the graph, contains a value and a windowing information and 2)
KV for keyed values.

FWIW, the larger windowing and grouping concepts are also separated in
Beam. It looks like this is the case with Flink as well. In examples
from both the user specifies the windowing first and then separately
specify the aggregation operation (e.g. group by key, combine, sum,
etc.). This saves from the combinatorial explosion of keyed / global,
unwindowed (batch) / session / fixed / tumbling / etc., GBK / sum /
count / etc.

- Chris

On Tue, Jun 20, 2017 at 10:46 AM, Chris Pettitt  wrote:
> Yi,
>
> What examples should we be looking at for new-api-v2?
>
> 1. 
> samza/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java
>
> others?
>
> - Chris
>
> On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan  wrote:
>> Hi, all,
>>
>> Here is the promised code examples for the revised API, and the related
>> change to how we specify serdes in the API:
>>
>> - User example for the new API chagne:
>> https://github.com/nickpan47/samza/tree/new-api-v2
>>
>> - Prateek’s PR for the proposed schema registry change:
>> https://github.com/nickpan47/samza/pull/2/files
>>
>> Please feel free to comment and provide feedbacks!
>>
>>
>> Thanks!
>>
>>
>> -Yi
>>
>> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan  wrote:
>>
>>> Hi, all,
>>>
>>> Thanks for all the inputs! Finally I got some time to go through the
>>> discussion thread and digest most of the points made above. Here is my
>>> personal summary:
>>>
>>> Consensus on requirements:
>>>
>>>1. ApplicationRunner needs async APIs.
>>>2. ApplicationRunner can be hidden from user (except maybe in config)
>>>3. StreamApplication is the direct wrapper for the programming
>>>interface (i.e. removing StreamGraph from the user API and allow users to
>>>call input() and output() from the StreamApplication) in main()
>>>4. There has to be a serialization format of the StreamApplication
>>>itself, s.t. the tasks can just deserialize and create the user logic
>>>included in StreamApplication in multiple TaskContext.
>>>5. JobRunner seems to be a very thin layer on-top-of StreamProcessor
>>>or YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a
>>>RemoteJob in a RemoteApplicationRunner. There is a desire to remove it
>>>6. StreamApplication needs to have some methods to allow user-injected
>>>global objects for the whole application, such as JmxServer,
>>>MetricsReporter, etc.
>>>
>>>
>>> Some additional discussion points:
>>>
>>>1. In StreamApplication#input()/output(), what should be the input /
>>>output parameter? The StreamSpec? Or the actual implementation I/O object
>>>to provide messages (i.e. similar to socket reader/file reader object)? 
>>> In
>>>the later case, we will need to define an abstract layer of StreamReader
>>>and StreamWriter in the user-facing API that supports read/write of
>>>partitioned streams on top of the 
>>> SystemConsumer/SystemProducer/SystemAdmin
>>>objects. Also, the number of I/O streams via the 
>>> StreamReader/StreamWriter
>>>can not be pre-determined (i.e. depending on input stream partitions and
>>>the groupers). Hence, I am leaning toward to expose StreamSpec in the API
>>>and let user builds the StreamSpec via SpecBuilder. The actual I/O 
>>> objects
>>>will be instantiated when SystemConsumer/SystemProducer are instantiated,
>>>with the number of physical partitions in each container.
>>>2. There is a need to support task-level programs via the same launch
>>>model as well.
>>>
>>>
>>> Some ideas to implement the above requirements:
>>>
>>>1. 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-20 Thread Chris Pettitt
Yi,

What examples should we be looking at for new-api-v2?

1. 
samza/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java

others?

- Chris

On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan  wrote:
> Hi, all,
>
> Here is the promised code examples for the revised API, and the related
> change to how we specify serdes in the API:
>
> - User example for the new API chagne:
> https://github.com/nickpan47/samza/tree/new-api-v2
>
> - Prateek’s PR for the proposed schema registry change:
> https://github.com/nickpan47/samza/pull/2/files
>
> Please feel free to comment and provide feedbacks!
>
>
> Thanks!
>
>
> -Yi
>
> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan  wrote:
>
>> Hi, all,
>>
>> Thanks for all the inputs! Finally I got some time to go through the
>> discussion thread and digest most of the points made above. Here is my
>> personal summary:
>>
>> Consensus on requirements:
>>
>>1. ApplicationRunner needs async APIs.
>>2. ApplicationRunner can be hidden from user (except maybe in config)
>>3. StreamApplication is the direct wrapper for the programming
>>interface (i.e. removing StreamGraph from the user API and allow users to
>>call input() and output() from the StreamApplication) in main()
>>4. There has to be a serialization format of the StreamApplication
>>itself, s.t. the tasks can just deserialize and create the user logic
>>included in StreamApplication in multiple TaskContext.
>>5. JobRunner seems to be a very thin layer on-top-of StreamProcessor
>>or YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a
>>RemoteJob in a RemoteApplicationRunner. There is a desire to remove it
>>6. StreamApplication needs to have some methods to allow user-injected
>>global objects for the whole application, such as JmxServer,
>>MetricsReporter, etc.
>>
>>
>> Some additional discussion points:
>>
>>1. In StreamApplication#input()/output(), what should be the input /
>>output parameter? The StreamSpec? Or the actual implementation I/O object
>>to provide messages (i.e. similar to socket reader/file reader object)? In
>>the later case, we will need to define an abstract layer of StreamReader
>>and StreamWriter in the user-facing API that supports read/write of
>>partitioned streams on top of the 
>> SystemConsumer/SystemProducer/SystemAdmin
>>objects. Also, the number of I/O streams via the StreamReader/StreamWriter
>>can not be pre-determined (i.e. depending on input stream partitions and
>>the groupers). Hence, I am leaning toward to expose StreamSpec in the API
>>and let user builds the StreamSpec via SpecBuilder. The actual I/O objects
>>will be instantiated when SystemConsumer/SystemProducer are instantiated,
>>with the number of physical partitions in each container.
>>2. There is a need to support task-level programs via the same launch
>>model as well.
>>
>>
>> Some ideas to implement the above requirements:
>>
>>1. StreamGraph#write() should be used internally to generate and
>>persist the serialized format of user logic. Then, StreamGraph#read()
>>should give back a deserialized version of user logic. This would implies
>>that the user functions defined in APIs are mandated to be serializable.
>>2. StreamApplication should include a SpecBuilder provides the
>>instantiation of MessageStream/Stores, which is passed to
>>StreamApplication#input() / StreamApplication#output()
>>3. StreamApplication should also include an internal ApplicationRunner
>>instance (config driven, class loaded) to be able to switch between local
>>vs remote execution
>>4. Implementation of LocalApplicationRunner should directly
>>instantiate and manage StreamProcessor instances for each job, removing 
>> the
>>LocalJobRunnner
>>5. Implementation of RemoteApplicationRunner should instantiate a
>>remote JobFactory, create the remote job and submitted it for each job,
>>removing the current JobRunner interface
>>6. We also need a StreamTaskApplication class that allows user to
>>create task-level applications, by mandate the constructor with a 
>> parameter
>>of StreamTaskFactory
>>
>>
>> One more opinion around the status and the waitForFinish():  I would think
>> that waitForFinish() is just waiting for the local Runtime to complete, not
>> to wait for the remote job to be completed.
>>
>> I will be working on revision of SEP-2 and some example user code example
>> for now and will share it soon.
>>
>> Thanks!
>>
>> -Yi
>>
>> On Wed, May 3, 2017 at 8:08 AM, Chris Pettitt <
>> cpett...@linkedin.com.invalid> wrote:
>>
>>> Hi Xinyu,
>>>
>>> I took a second look at the registerStore API. Would it be possible to
>>> call
>>> register storeDirectly on the app, similar to what we're doing with
>>> app.input (possible with the restriction registerStore must be called
>>> before 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-19 Thread Yi Pan
Hi, all,

Here is the promised code examples for the revised API, and the related
change to how we specify serdes in the API:

- User example for the new API chagne:
https://github.com/nickpan47/samza/tree/new-api-v2

- Prateek’s PR for the proposed schema registry change:
https://github.com/nickpan47/samza/pull/2/files

Please feel free to comment and provide feedbacks!


Thanks!


-Yi

On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan  wrote:

> Hi, all,
>
> Thanks for all the inputs! Finally I got some time to go through the
> discussion thread and digest most of the points made above. Here is my
> personal summary:
>
> Consensus on requirements:
>
>1. ApplicationRunner needs async APIs.
>2. ApplicationRunner can be hidden from user (except maybe in config)
>3. StreamApplication is the direct wrapper for the programming
>interface (i.e. removing StreamGraph from the user API and allow users to
>call input() and output() from the StreamApplication) in main()
>4. There has to be a serialization format of the StreamApplication
>itself, s.t. the tasks can just deserialize and create the user logic
>included in StreamApplication in multiple TaskContext.
>5. JobRunner seems to be a very thin layer on-top-of StreamProcessor
>or YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a
>RemoteJob in a RemoteApplicationRunner. There is a desire to remove it
>6. StreamApplication needs to have some methods to allow user-injected
>global objects for the whole application, such as JmxServer,
>MetricsReporter, etc.
>
>
> Some additional discussion points:
>
>1. In StreamApplication#input()/output(), what should be the input /
>output parameter? The StreamSpec? Or the actual implementation I/O object
>to provide messages (i.e. similar to socket reader/file reader object)? In
>the later case, we will need to define an abstract layer of StreamReader
>and StreamWriter in the user-facing API that supports read/write of
>partitioned streams on top of the SystemConsumer/SystemProducer/SystemAdmin
>objects. Also, the number of I/O streams via the StreamReader/StreamWriter
>can not be pre-determined (i.e. depending on input stream partitions and
>the groupers). Hence, I am leaning toward to expose StreamSpec in the API
>and let user builds the StreamSpec via SpecBuilder. The actual I/O objects
>will be instantiated when SystemConsumer/SystemProducer are instantiated,
>with the number of physical partitions in each container.
>2. There is a need to support task-level programs via the same launch
>model as well.
>
>
> Some ideas to implement the above requirements:
>
>1. StreamGraph#write() should be used internally to generate and
>persist the serialized format of user logic. Then, StreamGraph#read()
>should give back a deserialized version of user logic. This would implies
>that the user functions defined in APIs are mandated to be serializable.
>2. StreamApplication should include a SpecBuilder provides the
>instantiation of MessageStream/Stores, which is passed to
>StreamApplication#input() / StreamApplication#output()
>3. StreamApplication should also include an internal ApplicationRunner
>instance (config driven, class loaded) to be able to switch between local
>vs remote execution
>4. Implementation of LocalApplicationRunner should directly
>instantiate and manage StreamProcessor instances for each job, removing the
>LocalJobRunnner
>5. Implementation of RemoteApplicationRunner should instantiate a
>remote JobFactory, create the remote job and submitted it for each job,
>removing the current JobRunner interface
>6. We also need a StreamTaskApplication class that allows user to
>create task-level applications, by mandate the constructor with a parameter
>of StreamTaskFactory
>
>
> One more opinion around the status and the waitForFinish():  I would think
> that waitForFinish() is just waiting for the local Runtime to complete, not
> to wait for the remote job to be completed.
>
> I will be working on revision of SEP-2 and some example user code example
> for now and will share it soon.
>
> Thanks!
>
> -Yi
>
> On Wed, May 3, 2017 at 8:08 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
>> Hi Xinyu,
>>
>> I took a second look at the registerStore API. Would it be possible to
>> call
>> register storeDirectly on the app, similar to what we're doing with
>> app.input (possible with the restriction registerStore must be called
>> before we add an operator that uses the store)? Otherwise we'll end up
>> having to do two passes on the graph again - similar to the way we had to
>> do a pass to init stream config and then hook up the graph.
>>
>> Thanks,
>> Chris
>>
>>
>> On Fri, Apr 28, 2017 at 8:55 PM, xinyu liu  wrote:
>>
>> > Right, option #2 seems redundant for defining streams 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-06 Thread Yi Pan
Hi, all,

Thanks for all the inputs! Finally I got some time to go through the
discussion thread and digest most of the points made above. Here is my
personal summary:

Consensus on requirements:

   1. ApplicationRunner needs async APIs.
   2. ApplicationRunner can be hidden from user (except maybe in config)
   3. StreamApplication is the direct wrapper for the programming interface
   (i.e. removing StreamGraph from the user API and allow users to call
   input() and output() from the StreamApplication) in main()
   4. There has to be a serialization format of the StreamApplication
   itself, s.t. the tasks can just deserialize and create the user logic
   included in StreamApplication in multiple TaskContext.
   5. JobRunner seems to be a very thin layer on-top-of StreamProcessor or
   YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a
   RemoteJob in a RemoteApplicationRunner. There is a desire to remove it
   6. StreamApplication needs to have some methods to allow user-injected
   global objects for the whole application, such as JmxServer,
   MetricsReporter, etc.


Some additional discussion points:

   1. In StreamApplication#input()/output(), what should be the input /
   output parameter? The StreamSpec? Or the actual implementation I/O object
   to provide messages (i.e. similar to socket reader/file reader object)? In
   the later case, we will need to define an abstract layer of StreamReader
   and StreamWriter in the user-facing API that supports read/write of
   partitioned streams on top of the SystemConsumer/SystemProducer/SystemAdmin
   objects. Also, the number of I/O streams via the StreamReader/StreamWriter
   can not be pre-determined (i.e. depending on input stream partitions and
   the groupers). Hence, I am leaning toward to expose StreamSpec in the API
   and let user builds the StreamSpec via SpecBuilder. The actual I/O objects
   will be instantiated when SystemConsumer/SystemProducer are instantiated,
   with the number of physical partitions in each container.
   2. There is a need to support task-level programs via the same launch
   model as well.


Some ideas to implement the above requirements:

   1. StreamGraph#write() should be used internally to generate and persist
   the serialized format of user logic. Then, StreamGraph#read() should give
   back a deserialized version of user logic. This would implies that the user
   functions defined in APIs are mandated to be serializable.
   2. StreamApplication should include a SpecBuilder provides the
   instantiation of MessageStream/Stores, which is passed to
   StreamApplication#input() / StreamApplication#output()
   3. StreamApplication should also include an internal ApplicationRunner
   instance (config driven, class loaded) to be able to switch between local
   vs remote execution
   4. Implementation of LocalApplicationRunner should directly instantiate
   and manage StreamProcessor instances for each job, removing the
   LocalJobRunnner
   5. Implementation of RemoteApplicationRunner should instantiate a remote
   JobFactory, create the remote job and submitted it for each job, removing
   the current JobRunner interface
   6. We also need a StreamTaskApplication class that allows user to create
   task-level applications, by mandate the constructor with a parameter of
   StreamTaskFactory


One more opinion around the status and the waitForFinish():  I would think
that waitForFinish() is just waiting for the local Runtime to complete, not
to wait for the remote job to be completed.

I will be working on revision of SEP-2 and some example user code example
for now and will share it soon.

Thanks!

-Yi

On Wed, May 3, 2017 at 8:08 AM, Chris Pettitt  wrote:

> Hi Xinyu,
>
> I took a second look at the registerStore API. Would it be possible to call
> register storeDirectly on the app, similar to what we're doing with
> app.input (possible with the restriction registerStore must be called
> before we add an operator that uses the store)? Otherwise we'll end up
> having to do two passes on the graph again - similar to the way we had to
> do a pass to init stream config and then hook up the graph.
>
> Thanks,
> Chris
>
>
> On Fri, Apr 28, 2017 at 8:55 PM, xinyu liu  wrote:
>
> > Right, option #2 seems redundant for defining streams after further
> > discussion here. StreamSpec itself is flexible enough to achieve both
> > static and programmatic specification of the stream. Agree it's not
> > convenient for now (pretty obvious after looking at your bsr
> > beam.runners.samza.wrapper), and we should provide similar predefined
> > convenient wrappers for user to create the StreamSpec. In your case
> > something like BoundedStreamSpec.file() which will generate the
> system
> > and serialize the data as you did.
> >
> > We're still thinking the callback proposed in #2 can be useful for
> > requirement #6: injecting other user objects in run 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-05-03 Thread Chris Pettitt
Hi Xinyu,

I took a second look at the registerStore API. Would it be possible to call
register storeDirectly on the app, similar to what we're doing with
app.input (possible with the restriction registerStore must be called
before we add an operator that uses the store)? Otherwise we'll end up
having to do two passes on the graph again - similar to the way we had to
do a pass to init stream config and then hook up the graph.

Thanks,
Chris


On Fri, Apr 28, 2017 at 8:55 PM, xinyu liu  wrote:

> Right, option #2 seems redundant for defining streams after further
> discussion here. StreamSpec itself is flexible enough to achieve both
> static and programmatic specification of the stream. Agree it's not
> convenient for now (pretty obvious after looking at your bsr
> beam.runners.samza.wrapper), and we should provide similar predefined
> convenient wrappers for user to create the StreamSpec. In your case
> something like BoundedStreamSpec.file() which will generate the system
> and serialize the data as you did.
>
> We're still thinking the callback proposed in #2 can be useful for
> requirement #6: injecting other user objects in run time, such as stores
> and metrics. To simplify the user understanding further, I think we might
> hide the ApplicationRunner and expose the StreamApplication instead, which
> will make requirement #3 not user facing. So the API becomes like:
>
>   StreamApplication app = StreamApplication.local(config)
> .init (env -> {
>env.registerStore("my-store", new MyStoreFactory());
>env.registerMetricsReporter("my-reporte", new
> MyMetricsReporterFactory());
> })
> .withLifeCycleListener(myListener);
>
>   app.input(BoundedStreamSpec.create("/sample/input.txt"))
> .map(...)
> .window(...)
>
>   app.run();
>
> For requirement #5, I add a .withLifeCycleListener() in the API, which can
> trigger the callbacks with life cycle events.
>
> For #4: distribution of the jars will be what we have today using the Yarn
> localization with a remote store like artifactory or http server. We
> discussed where to put the graph serialization. The current thinking is to
> define a general interface which can backed by a remote store, like Kafka,
> artifactory or http server. For Kafka, it's straightforward but we will
> have the size limit or cut it by ourselves. For the other two, we need to
> investigate whether we can easily upload jars to our artifactory and
> localizing it with Yarn. Any opinions on this?
>
> Thanks,
> Xinyu
>
> On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
> > Your proposal for #1 looks good.
> >
> > I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
> > the stream spec straight onto the runner while in #2 you do it in a
> > callback. If it is either-or, #1 looks a lot better for my purposes.
> >
> > For #4 what mechanism are you using to distribute the JARs? Can you use
> the
> > same mechanism to distribute the serialized graph?
> >
> > On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu 
> wrote:
> >
> > > btw, I will get to SAMZA-1246 as soon as possible.
> > >
> > > Thanks,
> > > Xinyu
> > >
> > > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu 
> > wrote:
> > >
> > > > Let me try to capture the updated requirements:
> > > >
> > > > 1. Set up input streams outside StreamGraph, and treat graph building
> > as
> > > a
> > > > library (*Fluent, Beam*).
> > > >
> > > > 2. Improve ease of use for ApplicationRunner to avoid complex
> > > > configurations such as zkCoordinator, zkCoordinationService.
> > > (*Standalone*).
> > > > Provide some programmatic way to tweak them in the API.
> > > >
> > > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We
> > can
> > > > have one or more implementations but it's hidden from the users.
> > > >
> > > > 4. Separate StreamGraph from runtime environment so it can be
> > serialized
> > > (*Beam,
> > > > Yarn*)
> > > >
> > > > 5. Better life cycle management of application, parity with
> > > > StreamProcessor (*Standalone, Beam*). Stats should include exception
> in
> > > > case of failure (tracked in SAMZA-1246).
> > > >
> > > > 6. Support injecting user-defined objects into ApplicationRunner.
> > > >
> > > > Prateek and I iterate on the ApplilcationRunner API based on these
> > > > requirements. To support #1, we can set up input streams on the
> runner
> > > > level, which returns the MessageStream and allows graph building
> > > > afterwards. The code looks like below:
> > > >
> > > >   ApplicationRunner runner = ApplicationRunner.local();
> > > >   runner.input(streamSpec)
> > > > .map(..)
> > > > .window(...)
> > > >   runner.run();
> > > >
> > > > StreamSpec is the building block for setting up streams here. It can
> be
> > > > set up in different ways:
> > > >
> > > >   - Direct creation of stream spec, like runner.input(new

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-05-01 Thread xinyu liu
Looked again at Chris's beam-samza-runner implementation. Seems
LocalApplicationRunner.run() should be asynchronous too. Current
implementation is actually using a latch to wait for the StreamProcessors
to finish, which seems unnecessary. And we can provide a waitUntilFinish()
counterpart to the user. I created
https://issues.apache.org/jira/browse/SAMZA-1252 to track it.

Thanks,
Xinyu

On Fri, Apr 28, 2017 at 5:55 PM, xinyu liu  wrote:

> Right, option #2 seems redundant for defining streams after further
> discussion here. StreamSpec itself is flexible enough to achieve both
> static and programmatic specification of the stream. Agree it's not
> convenient for now (pretty obvious after looking at your bsr
> beam.runners.samza.wrapper), and we should provide similar predefined
> convenient wrappers for user to create the StreamSpec. In your case
> something like BoundedStreamSpec.file() which will generate the system
> and serialize the data as you did.
>
> We're still thinking the callback proposed in #2 can be useful for
> requirement #6: injecting other user objects in run time, such as stores
> and metrics. To simplify the user understanding further, I think we might
> hide the ApplicationRunner and expose the StreamApplication instead, which
> will make requirement #3 not user facing. So the API becomes like:
>
>   StreamApplication app = StreamApplication.local(config)
> .init (env -> {
>env.registerStore("my-store", new MyStoreFactory());
>env.registerMetricsReporter("my-reporte", new
> MyMetricsReporterFactory());
> })
> .withLifeCycleListener(myListener);
>
>   app.input(BoundedStreamSpec.create("/sample/input.txt"))
> .map(...)
> .window(...)
>
>   app.run();
>
> For requirement #5, I add a .withLifeCycleListener() in the API, which can
> trigger the callbacks with life cycle events.
>
> For #4: distribution of the jars will be what we have today using the Yarn
> localization with a remote store like artifactory or http server. We
> discussed where to put the graph serialization. The current thinking is to
> define a general interface which can backed by a remote store, like Kafka,
> artifactory or http server. For Kafka, it's straightforward but we will
> have the size limit or cut it by ourselves. For the other two, we need to
> investigate whether we can easily upload jars to our artifactory and
> localizing it with Yarn. Any opinions on this?
>
> Thanks,
> Xinyu
>
> On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
>> Your proposal for #1 looks good.
>>
>> I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
>> the stream spec straight onto the runner while in #2 you do it in a
>> callback. If it is either-or, #1 looks a lot better for my purposes.
>>
>> For #4 what mechanism are you using to distribute the JARs? Can you use
>> the
>> same mechanism to distribute the serialized graph?
>>
>> On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu 
>> wrote:
>>
>> > btw, I will get to SAMZA-1246 as soon as possible.
>> >
>> > Thanks,
>> > Xinyu
>> >
>> > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu 
>> wrote:
>> >
>> > > Let me try to capture the updated requirements:
>> > >
>> > > 1. Set up input streams outside StreamGraph, and treat graph building
>> as
>> > a
>> > > library (*Fluent, Beam*).
>> > >
>> > > 2. Improve ease of use for ApplicationRunner to avoid complex
>> > > configurations such as zkCoordinator, zkCoordinationService.
>> > (*Standalone*).
>> > > Provide some programmatic way to tweak them in the API.
>> > >
>> > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We
>> can
>> > > have one or more implementations but it's hidden from the users.
>> > >
>> > > 4. Separate StreamGraph from runtime environment so it can be
>> serialized
>> > (*Beam,
>> > > Yarn*)
>> > >
>> > > 5. Better life cycle management of application, parity with
>> > > StreamProcessor (*Standalone, Beam*). Stats should include exception
>> in
>> > > case of failure (tracked in SAMZA-1246).
>> > >
>> > > 6. Support injecting user-defined objects into ApplicationRunner.
>> > >
>> > > Prateek and I iterate on the ApplilcationRunner API based on these
>> > > requirements. To support #1, we can set up input streams on the runner
>> > > level, which returns the MessageStream and allows graph building
>> > > afterwards. The code looks like below:
>> > >
>> > >   ApplicationRunner runner = ApplicationRunner.local();
>> > >   runner.input(streamSpec)
>> > > .map(..)
>> > > .window(...)
>> > >   runner.run();
>> > >
>> > > StreamSpec is the building block for setting up streams here. It can
>> be
>> > > set up in different ways:
>> > >
>> > >   - Direct creation of stream spec, like runner.input(new
>> StreamSpec(id,
>> > > system, stream))
>> > >   - Load from streamId from env or config, like
>> > 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-28 Thread xinyu liu
Right, option #2 seems redundant for defining streams after further
discussion here. StreamSpec itself is flexible enough to achieve both
static and programmatic specification of the stream. Agree it's not
convenient for now (pretty obvious after looking at your bsr
beam.runners.samza.wrapper), and we should provide similar predefined
convenient wrappers for user to create the StreamSpec. In your case
something like BoundedStreamSpec.file() which will generate the system
and serialize the data as you did.

We're still thinking the callback proposed in #2 can be useful for
requirement #6: injecting other user objects in run time, such as stores
and metrics. To simplify the user understanding further, I think we might
hide the ApplicationRunner and expose the StreamApplication instead, which
will make requirement #3 not user facing. So the API becomes like:

  StreamApplication app = StreamApplication.local(config)
.init (env -> {
   env.registerStore("my-store", new MyStoreFactory());
   env.registerMetricsReporter("my-reporte", new
MyMetricsReporterFactory());
})
.withLifeCycleListener(myListener);

  app.input(BoundedStreamSpec.create("/sample/input.txt"))
.map(...)
.window(...)

  app.run();

For requirement #5, I add a .withLifeCycleListener() in the API, which can
trigger the callbacks with life cycle events.

For #4: distribution of the jars will be what we have today using the Yarn
localization with a remote store like artifactory or http server. We
discussed where to put the graph serialization. The current thinking is to
define a general interface which can backed by a remote store, like Kafka,
artifactory or http server. For Kafka, it's straightforward but we will
have the size limit or cut it by ourselves. For the other two, we need to
investigate whether we can easily upload jars to our artifactory and
localizing it with Yarn. Any opinions on this?

Thanks,
Xinyu

On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt <
cpett...@linkedin.com.invalid> wrote:

> Your proposal for #1 looks good.
>
> I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
> the stream spec straight onto the runner while in #2 you do it in a
> callback. If it is either-or, #1 looks a lot better for my purposes.
>
> For #4 what mechanism are you using to distribute the JARs? Can you use the
> same mechanism to distribute the serialized graph?
>
> On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu  wrote:
>
> > btw, I will get to SAMZA-1246 as soon as possible.
> >
> > Thanks,
> > Xinyu
> >
> > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu 
> wrote:
> >
> > > Let me try to capture the updated requirements:
> > >
> > > 1. Set up input streams outside StreamGraph, and treat graph building
> as
> > a
> > > library (*Fluent, Beam*).
> > >
> > > 2. Improve ease of use for ApplicationRunner to avoid complex
> > > configurations such as zkCoordinator, zkCoordinationService.
> > (*Standalone*).
> > > Provide some programmatic way to tweak them in the API.
> > >
> > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We
> can
> > > have one or more implementations but it's hidden from the users.
> > >
> > > 4. Separate StreamGraph from runtime environment so it can be
> serialized
> > (*Beam,
> > > Yarn*)
> > >
> > > 5. Better life cycle management of application, parity with
> > > StreamProcessor (*Standalone, Beam*). Stats should include exception in
> > > case of failure (tracked in SAMZA-1246).
> > >
> > > 6. Support injecting user-defined objects into ApplicationRunner.
> > >
> > > Prateek and I iterate on the ApplilcationRunner API based on these
> > > requirements. To support #1, we can set up input streams on the runner
> > > level, which returns the MessageStream and allows graph building
> > > afterwards. The code looks like below:
> > >
> > >   ApplicationRunner runner = ApplicationRunner.local();
> > >   runner.input(streamSpec)
> > > .map(..)
> > > .window(...)
> > >   runner.run();
> > >
> > > StreamSpec is the building block for setting up streams here. It can be
> > > set up in different ways:
> > >
> > >   - Direct creation of stream spec, like runner.input(new
> StreamSpec(id,
> > > system, stream))
> > >   - Load from streamId from env or config, like
> > runner.input(runner.env().
> > > getStreamSpec(id))
> > >   - Canned Spec which generates the StreamSpec with id, system and
> stream
> > > to minimize the configuration. For example, CollectionSpec.create(new
> > > ArrayList[]{1,2,3,4}), which will auto generate the system and stream
> in
> > > the spec.
> > >
> > > To support #2, we need to be able to set up StreamSpec-related objects
> > and
> > > factories programmatically in env. Suppose we have the following before
> > > runner.input(...):
> > >
> > >   runner.setup(env /* a writable interface of env*/ -> {
> > > env.setStreamSpec(streamId, streamSpec);
> > > 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-28 Thread Chris Pettitt
Your proposal for #1 looks good.

I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
the stream spec straight onto the runner while in #2 you do it in a
callback. If it is either-or, #1 looks a lot better for my purposes.

For #4 what mechanism are you using to distribute the JARs? Can you use the
same mechanism to distribute the serialized graph?

On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu  wrote:

> btw, I will get to SAMZA-1246 as soon as possible.
>
> Thanks,
> Xinyu
>
> On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu  wrote:
>
> > Let me try to capture the updated requirements:
> >
> > 1. Set up input streams outside StreamGraph, and treat graph building as
> a
> > library (*Fluent, Beam*).
> >
> > 2. Improve ease of use for ApplicationRunner to avoid complex
> > configurations such as zkCoordinator, zkCoordinationService.
> (*Standalone*).
> > Provide some programmatic way to tweak them in the API.
> >
> > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
> > have one or more implementations but it's hidden from the users.
> >
> > 4. Separate StreamGraph from runtime environment so it can be serialized
> (*Beam,
> > Yarn*)
> >
> > 5. Better life cycle management of application, parity with
> > StreamProcessor (*Standalone, Beam*). Stats should include exception in
> > case of failure (tracked in SAMZA-1246).
> >
> > 6. Support injecting user-defined objects into ApplicationRunner.
> >
> > Prateek and I iterate on the ApplilcationRunner API based on these
> > requirements. To support #1, we can set up input streams on the runner
> > level, which returns the MessageStream and allows graph building
> > afterwards. The code looks like below:
> >
> >   ApplicationRunner runner = ApplicationRunner.local();
> >   runner.input(streamSpec)
> > .map(..)
> > .window(...)
> >   runner.run();
> >
> > StreamSpec is the building block for setting up streams here. It can be
> > set up in different ways:
> >
> >   - Direct creation of stream spec, like runner.input(new StreamSpec(id,
> > system, stream))
> >   - Load from streamId from env or config, like
> runner.input(runner.env().
> > getStreamSpec(id))
> >   - Canned Spec which generates the StreamSpec with id, system and stream
> > to minimize the configuration. For example, CollectionSpec.create(new
> > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
> > the spec.
> >
> > To support #2, we need to be able to set up StreamSpec-related objects
> and
> > factories programmatically in env. Suppose we have the following before
> > runner.input(...):
> >
> >   runner.setup(env /* a writable interface of env*/ -> {
> > env.setStreamSpec(streamId, streamSpec);
> > env.setSystem(systemName, systemFactory);
> >   })
> >
> > runner.setup(->) also provides setup for stores and other runtime stuff
> > needed for the execution. The setup should be able to serialized to
> config.
> > For #6, I haven't figured out a good way to inject user-defined objects
> > here yet.
> >
> > With this API, we should be able to also support #4. For remote
> > runner.run(), the operator user classes/lamdas in the StreamGraph need to
> > be serialized. As today, the existing option is to serialize to a stream,
> > either the coordinator stream or the pipeline control stream, which will
> > have the size limit per message. Do you see RPC as an option?
> >
> > For this version of API, seems we don't need the StreamApplication
> wrapper
> > as well as exposing the StreamGraph. Do you think we are on the right
> path?
> >
> > Thanks,
> > Xinyu
> >
> >
> > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> > cpett...@linkedin.com.invalid> wrote:
> >
> >> That should have been:
> >>
> >> For #1, Beam doesn't have a hard requirement...
> >>
> >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt 
> >> wrote:
> >>
> >> > For #1, I doesn't have a hard requirement for any change from Samza. A
> >> > very nice to have would be to allow the input systems to be set up at
> >> the
> >> > same time as the rest of the StreamGraph. An even nicer to have would
> >> be to
> >> > do away with the callback based approach and treat graph building as a
> >> > library, a la Beam and Flink.
> >> >
> >> > For the moment I've worked around the two pass requirement (once for
> >> > config, once for StreamGraph) by introducing an IR layer between Beam
> >> and
> >> > the Samza Fluent translation. The IR layer is convenient independent
> of
> >> > this problem because it makes it easier to switch between the Fluent
> and
> >> > low-level APIs.
> >> >
> >> >
> >> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> >> > great shape. One additional issue with the status call that I may not
> >> have
> >> > mentioned is that it provides you no way to get at the cause of
> failure.
> >> > The StreamProcessor API does allow this via the 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread xinyu liu
btw, I will get to SAMZA-1246 as soon as possible.

Thanks,
Xinyu

On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu  wrote:

> Let me try to capture the updated requirements:
>
> 1. Set up input streams outside StreamGraph, and treat graph building as a
> library (*Fluent, Beam*).
>
> 2. Improve ease of use for ApplicationRunner to avoid complex
> configurations such as zkCoordinator, zkCoordinationService. (*Standalone*).
> Provide some programmatic way to tweak them in the API.
>
> 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
> have one or more implementations but it's hidden from the users.
>
> 4. Separate StreamGraph from runtime environment so it can be serialized 
> (*Beam,
> Yarn*)
>
> 5. Better life cycle management of application, parity with
> StreamProcessor (*Standalone, Beam*). Stats should include exception in
> case of failure (tracked in SAMZA-1246).
>
> 6. Support injecting user-defined objects into ApplicationRunner.
>
> Prateek and I iterate on the ApplilcationRunner API based on these
> requirements. To support #1, we can set up input streams on the runner
> level, which returns the MessageStream and allows graph building
> afterwards. The code looks like below:
>
>   ApplicationRunner runner = ApplicationRunner.local();
>   runner.input(streamSpec)
> .map(..)
> .window(...)
>   runner.run();
>
> StreamSpec is the building block for setting up streams here. It can be
> set up in different ways:
>
>   - Direct creation of stream spec, like runner.input(new StreamSpec(id,
> system, stream))
>   - Load from streamId from env or config, like runner.input(runner.env().
> getStreamSpec(id))
>   - Canned Spec which generates the StreamSpec with id, system and stream
> to minimize the configuration. For example, CollectionSpec.create(new
> ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
> the spec.
>
> To support #2, we need to be able to set up StreamSpec-related objects and
> factories programmatically in env. Suppose we have the following before
> runner.input(...):
>
>   runner.setup(env /* a writable interface of env*/ -> {
> env.setStreamSpec(streamId, streamSpec);
> env.setSystem(systemName, systemFactory);
>   })
>
> runner.setup(->) also provides setup for stores and other runtime stuff
> needed for the execution. The setup should be able to serialized to config.
> For #6, I haven't figured out a good way to inject user-defined objects
> here yet.
>
> With this API, we should be able to also support #4. For remote
> runner.run(), the operator user classes/lamdas in the StreamGraph need to
> be serialized. As today, the existing option is to serialize to a stream,
> either the coordinator stream or the pipeline control stream, which will
> have the size limit per message. Do you see RPC as an option?
>
> For this version of API, seems we don't need the StreamApplication wrapper
> as well as exposing the StreamGraph. Do you think we are on the right path?
>
> Thanks,
> Xinyu
>
>
> On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
>> That should have been:
>>
>> For #1, Beam doesn't have a hard requirement...
>>
>> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt 
>> wrote:
>>
>> > For #1, I doesn't have a hard requirement for any change from Samza. A
>> > very nice to have would be to allow the input systems to be set up at
>> the
>> > same time as the rest of the StreamGraph. An even nicer to have would
>> be to
>> > do away with the callback based approach and treat graph building as a
>> > library, a la Beam and Flink.
>> >
>> > For the moment I've worked around the two pass requirement (once for
>> > config, once for StreamGraph) by introducing an IR layer between Beam
>> and
>> > the Samza Fluent translation. The IR layer is convenient independent of
>> > this problem because it makes it easier to switch between the Fluent and
>> > low-level APIs.
>> >
>> >
>> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
>> > great shape. One additional issue with the status call that I may not
>> have
>> > mentioned is that it provides you no way to get at the cause of failure.
>> > The StreamProcessor API does allow this via the callback.
>> >
>> >
>> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
>> > indirection you currently have to jump through (this is also related to
>> > system consumer configuration from #1. It makes it much easier to
>> discover
>> > what the configurable parameters are too, if we provide some
>> programmatic
>> > way to tweak them in the API - which can turn into config under the
>> hood.
>> >
>> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu 
>> wrote:
>> >
>> >> Let me give a shot to summarize the requirements for ApplicationRunner
>> we
>> >> have discussed so far:
>> >>
>> >> - Support environment for passing in user-defined 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread xinyu liu
Let me try to capture the updated requirements:

1. Set up input streams outside StreamGraph, and treat graph building as a
library (*Fluent, Beam*).

2. Improve ease of use for ApplicationRunner to avoid complex
configurations such as zkCoordinator, zkCoordinationService. (*Standalone*).
Provide some programmatic way to tweak them in the API.

3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
have one or more implementations but it's hidden from the users.

4. Separate StreamGraph from runtime environment so it can be
serialized (*Beam,
Yarn*)

5. Better life cycle management of application, parity with StreamProcessor
(*Standalone, Beam*). Stats should include exception in case of failure
(tracked in SAMZA-1246).

6. Support injecting user-defined objects into ApplicationRunner.

Prateek and I iterate on the ApplilcationRunner API based on these
requirements. To support #1, we can set up input streams on the runner
level, which returns the MessageStream and allows graph building
afterwards. The code looks like below:

  ApplicationRunner runner = ApplicationRunner.local();
  runner.input(streamSpec)
.map(..)
.window(...)
  runner.run();

StreamSpec is the building block for setting up streams here. It can be set
up in different ways:

  - Direct creation of stream spec, like runner.input(new StreamSpec(id,
system, stream))
  - Load from streamId from env or config, like
runner.input(runner.env().getStreamSpec(id))
  - Canned Spec which generates the StreamSpec with id, system and stream
to minimize the configuration. For example, CollectionSpec.create(new
ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
the spec.

To support #2, we need to be able to set up StreamSpec-related objects and
factories programmatically in env. Suppose we have the following before
runner.input(...):

  runner.setup(env /* a writable interface of env*/ -> {
env.setStreamSpec(streamId, streamSpec);
env.setSystem(systemName, systemFactory);
  })

runner.setup(->) also provides setup for stores and other runtime stuff
needed for the execution. The setup should be able to serialized to config.
For #6, I haven't figured out a good way to inject user-defined objects
here yet.

With this API, we should be able to also support #4. For remote
runner.run(), the operator user classes/lamdas in the StreamGraph need to
be serialized. As today, the existing option is to serialize to a stream,
either the coordinator stream or the pipeline control stream, which will
have the size limit per message. Do you see RPC as an option?

For this version of API, seems we don't need the StreamApplication wrapper
as well as exposing the StreamGraph. Do you think we are on the right path?

Thanks,
Xinyu


On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
cpett...@linkedin.com.invalid> wrote:

> That should have been:
>
> For #1, Beam doesn't have a hard requirement...
>
> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt 
> wrote:
>
> > For #1, I doesn't have a hard requirement for any change from Samza. A
> > very nice to have would be to allow the input systems to be set up at the
> > same time as the rest of the StreamGraph. An even nicer to have would be
> to
> > do away with the callback based approach and treat graph building as a
> > library, a la Beam and Flink.
> >
> > For the moment I've worked around the two pass requirement (once for
> > config, once for StreamGraph) by introducing an IR layer between Beam and
> > the Samza Fluent translation. The IR layer is convenient independent of
> > this problem because it makes it easier to switch between the Fluent and
> > low-level APIs.
> >
> >
> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> > great shape. One additional issue with the status call that I may not
> have
> > mentioned is that it provides you no way to get at the cause of failure.
> > The StreamProcessor API does allow this via the callback.
> >
> >
> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
> > indirection you currently have to jump through (this is also related to
> > system consumer configuration from #1. It makes it much easier to
> discover
> > what the configurable parameters are too, if we provide some programmatic
> > way to tweak them in the API - which can turn into config under the hood.
> >
> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu 
> wrote:
> >
> >> Let me give a shot to summarize the requirements for ApplicationRunner
> we
> >> have discussed so far:
> >>
> >> - Support environment for passing in user-defined objects (streams
> >> potentially) into ApplicationRunner (*Beam*)
> >>
> >> - Improve ease of use for ApplicationRunner to avoid complex
> >> configurations
> >> such as zkCoordinator, zkCoordinationService. (*Standalone*)
> >>
> >> - Clean up ApplicationRunner into a single interface (*Fluent*). We can
> >> have one or more 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread Chris Pettitt
That should have been:

For #1, Beam doesn't have a hard requirement...

On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt 
wrote:

> For #1, I doesn't have a hard requirement for any change from Samza. A
> very nice to have would be to allow the input systems to be set up at the
> same time as the rest of the StreamGraph. An even nicer to have would be to
> do away with the callback based approach and treat graph building as a
> library, a la Beam and Flink.
>
> For the moment I've worked around the two pass requirement (once for
> config, once for StreamGraph) by introducing an IR layer between Beam and
> the Samza Fluent translation. The IR layer is convenient independent of
> this problem because it makes it easier to switch between the Fluent and
> low-level APIs.
>
>
> For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> great shape. One additional issue with the status call that I may not have
> mentioned is that it provides you no way to get at the cause of failure.
> The StreamProcessor API does allow this via the callback.
>
>
> Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
> indirection you currently have to jump through (this is also related to
> system consumer configuration from #1. It makes it much easier to discover
> what the configurable parameters are too, if we provide some programmatic
> way to tweak them in the API - which can turn into config under the hood.
>
> On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu  wrote:
>
>> Let me give a shot to summarize the requirements for ApplicationRunner we
>> have discussed so far:
>>
>> - Support environment for passing in user-defined objects (streams
>> potentially) into ApplicationRunner (*Beam*)
>>
>> - Improve ease of use for ApplicationRunner to avoid complex
>> configurations
>> such as zkCoordinator, zkCoordinationService. (*Standalone*)
>>
>> - Clean up ApplicationRunner into a single interface (*Fluent*). We can
>> have one or more implementations but it's hidden from the users.
>>
>> - Separate StreamGraph from environment so it can be serializable (*Beam,
>> Yarn*)
>>
>> - Better life cycle management of application, including
>> start/stop/stats (*Standalone,
>> Beam*)
>>
>>
>> One way to address 2 and 3 is to provide pre-packaged runner using static
>> factory methods, and the return type will be the ApplicationRunner
>> interface. So we can have:
>>
>>   ApplicationRunner runner = ApplicationRunner.zk() /
>> ApplicationRunner.local()
>> / ApplicationRunner.remote() / ApplicationRunner.test().
>>
>> Internally we will package the right configs and run-time environment with
>> the runner. For example, ApplicationRunner.zk() will define all the
>> configs
>> needed for zk coordination.
>>
>> To support 1 and 4, can we pass in a lambda function in the runner, and
>> then we can run the stream graph? Like the following:
>>
>>   ApplicationRunner.zk().env(config -> environment).run(streamGraph);
>>
>> Then we need a way to pass the environment into the StreamGraph. This can
>> be done by either adding an extra parameter to each operator, or have a
>> getEnv() function in the MessageStream, which seems to be pretty hacky.
>>
>> What do you think?
>>
>> Thanks,
>> Xinyu
>>
>>
>>
>>
>>
>> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
>> pmaheshw...@linkedin.com.invalid> wrote:
>>
>> > Thanks for putting this together Yi!
>> >
>> > I agree with Jake, it does seem like there are a few too many moving
>> parts
>> > here. That said, the problem being solved is pretty broad, so let me
>> try to
>> > summarize my current understanding of the requirements. Please correct
>> me
>> > if I'm wrong or missing something.
>> >
>> > ApplicationRunner and JobRunner first, ignoring test environment for the
>> > moment.
>> > ApplicationRunner:
>> > 1. Create execution plan: Same in Standalone and Yarn
>> > 2. Create intermediate streams: Same logic but different leader election
>> > (ZK-based or pre-configured in standalone, AM in Yarn).
>> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
>> >
>> > JobRunner:
>> > 1. Run the StreamProcessors: Same process in Standalone & Test. Remote
>> host
>> > in Yarn.
>> >
>> > To get a single ApplicationRunner implementation, like Jake suggested,
>> we
>> > need to make leader election and JobRunner implementation pluggable.
>> > There's still the question of whether ApplicationRunner#run API should
>> be
>> > blocking or non-blocking. It has to be non-blocking in YARN. We want it
>> to
>> > be blocking in standalone, but seems like the main reason is ease of use
>> > when launched from main(). I'd prefer making it consitently non-blocking
>> > instead, esp. since in embedded standalone mode (where the processor is
>> > running in another container) a blocking API would not be user-friendly
>> > either. If not, we can add both run and runBlocking.
>> >
>> > Coming to RuntimeEnvironment, which is the least 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread Chris Pettitt
For #1, I doesn't have a hard requirement for any change from Samza. A very
nice to have would be to allow the input systems to be set up at the same
time as the rest of the StreamGraph. An even nicer to have would be to do
away with the callback based approach and treat graph building as a
library, a la Beam and Flink.

For the moment I've worked around the two pass requirement (once for
config, once for StreamGraph) by introducing an IR layer between Beam and
the Samza Fluent translation. The IR layer is convenient independent of
this problem because it makes it easier to switch between the Fluent and
low-level APIs.


For #4, if we had parity with StreamProcessor for lifecycle we'd be in
great shape. One additional issue with the status call that I may not have
mentioned is that it provides you no way to get at the cause of failure.
The StreamProcessor API does allow this via the callback.


Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
indirection you currently have to jump through (this is also related to
system consumer configuration from #1. It makes it much easier to discover
what the configurable parameters are too, if we provide some programmatic
way to tweak them in the API - which can turn into config under the hood.

On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu  wrote:

> Let me give a shot to summarize the requirements for ApplicationRunner we
> have discussed so far:
>
> - Support environment for passing in user-defined objects (streams
> potentially) into ApplicationRunner (*Beam*)
>
> - Improve ease of use for ApplicationRunner to avoid complex configurations
> such as zkCoordinator, zkCoordinationService. (*Standalone*)
>
> - Clean up ApplicationRunner into a single interface (*Fluent*). We can
> have one or more implementations but it's hidden from the users.
>
> - Separate StreamGraph from environment so it can be serializable (*Beam,
> Yarn*)
>
> - Better life cycle management of application, including
> start/stop/stats (*Standalone,
> Beam*)
>
>
> One way to address 2 and 3 is to provide pre-packaged runner using static
> factory methods, and the return type will be the ApplicationRunner
> interface. So we can have:
>
>   ApplicationRunner runner = ApplicationRunner.zk() /
> ApplicationRunner.local()
> / ApplicationRunner.remote() / ApplicationRunner.test().
>
> Internally we will package the right configs and run-time environment with
> the runner. For example, ApplicationRunner.zk() will define all the configs
> needed for zk coordination.
>
> To support 1 and 4, can we pass in a lambda function in the runner, and
> then we can run the stream graph? Like the following:
>
>   ApplicationRunner.zk().env(config -> environment).run(streamGraph);
>
> Then we need a way to pass the environment into the StreamGraph. This can
> be done by either adding an extra parameter to each operator, or have a
> getEnv() function in the MessageStream, which seems to be pretty hacky.
>
> What do you think?
>
> Thanks,
> Xinyu
>
>
>
>
>
> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
> pmaheshw...@linkedin.com.invalid> wrote:
>
> > Thanks for putting this together Yi!
> >
> > I agree with Jake, it does seem like there are a few too many moving
> parts
> > here. That said, the problem being solved is pretty broad, so let me try
> to
> > summarize my current understanding of the requirements. Please correct me
> > if I'm wrong or missing something.
> >
> > ApplicationRunner and JobRunner first, ignoring test environment for the
> > moment.
> > ApplicationRunner:
> > 1. Create execution plan: Same in Standalone and Yarn
> > 2. Create intermediate streams: Same logic but different leader election
> > (ZK-based or pre-configured in standalone, AM in Yarn).
> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
> >
> > JobRunner:
> > 1. Run the StreamProcessors: Same process in Standalone & Test. Remote
> host
> > in Yarn.
> >
> > To get a single ApplicationRunner implementation, like Jake suggested, we
> > need to make leader election and JobRunner implementation pluggable.
> > There's still the question of whether ApplicationRunner#run API should be
> > blocking or non-blocking. It has to be non-blocking in YARN. We want it
> to
> > be blocking in standalone, but seems like the main reason is ease of use
> > when launched from main(). I'd prefer making it consitently non-blocking
> > instead, esp. since in embedded standalone mode (where the processor is
> > running in another container) a blocking API would not be user-friendly
> > either. If not, we can add both run and runBlocking.
> >
> > Coming to RuntimeEnvironment, which is the least clear to me so far:
> > 1. I don't think RuntimeEnvironment should be responsible for providing
> > StreamSpecs for streamIds - they can be obtained with a config/util
> class.
> > The StreamProcessor should only know about logical streamIds and the
> > streamId <-> actual stream mapping should happen 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-26 Thread xinyu liu
Let me give a shot to summarize the requirements for ApplicationRunner we
have discussed so far:

- Support environment for passing in user-defined objects (streams
potentially) into ApplicationRunner (*Beam*)

- Improve ease of use for ApplicationRunner to avoid complex configurations
such as zkCoordinator, zkCoordinationService. (*Standalone*)

- Clean up ApplicationRunner into a single interface (*Fluent*). We can
have one or more implementations but it's hidden from the users.

- Separate StreamGraph from environment so it can be serializable (*Beam,
Yarn*)

- Better life cycle management of application, including
start/stop/stats (*Standalone,
Beam*)


One way to address 2 and 3 is to provide pre-packaged runner using static
factory methods, and the return type will be the ApplicationRunner
interface. So we can have:

  ApplicationRunner runner = ApplicationRunner.zk() / ApplicationRunner.local()
/ ApplicationRunner.remote() / ApplicationRunner.test().

Internally we will package the right configs and run-time environment with
the runner. For example, ApplicationRunner.zk() will define all the configs
needed for zk coordination.

To support 1 and 4, can we pass in a lambda function in the runner, and
then we can run the stream graph? Like the following:

  ApplicationRunner.zk().env(config -> environment).run(streamGraph);

Then we need a way to pass the environment into the StreamGraph. This can
be done by either adding an extra parameter to each operator, or have a
getEnv() function in the MessageStream, which seems to be pretty hacky.

What do you think?

Thanks,
Xinyu





On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
pmaheshw...@linkedin.com.invalid> wrote:

> Thanks for putting this together Yi!
>
> I agree with Jake, it does seem like there are a few too many moving parts
> here. That said, the problem being solved is pretty broad, so let me try to
> summarize my current understanding of the requirements. Please correct me
> if I'm wrong or missing something.
>
> ApplicationRunner and JobRunner first, ignoring test environment for the
> moment.
> ApplicationRunner:
> 1. Create execution plan: Same in Standalone and Yarn
> 2. Create intermediate streams: Same logic but different leader election
> (ZK-based or pre-configured in standalone, AM in Yarn).
> 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
>
> JobRunner:
> 1. Run the StreamProcessors: Same process in Standalone & Test. Remote host
> in Yarn.
>
> To get a single ApplicationRunner implementation, like Jake suggested, we
> need to make leader election and JobRunner implementation pluggable.
> There's still the question of whether ApplicationRunner#run API should be
> blocking or non-blocking. It has to be non-blocking in YARN. We want it to
> be blocking in standalone, but seems like the main reason is ease of use
> when launched from main(). I'd prefer making it consitently non-blocking
> instead, esp. since in embedded standalone mode (where the processor is
> running in another container) a blocking API would not be user-friendly
> either. If not, we can add both run and runBlocking.
>
> Coming to RuntimeEnvironment, which is the least clear to me so far:
> 1. I don't think RuntimeEnvironment should be responsible for providing
> StreamSpecs for streamIds - they can be obtained with a config/util class.
> The StreamProcessor should only know about logical streamIds and the
> streamId <-> actual stream mapping should happen within the
> SystemProducer/Consumer/Admins provided by the RuntimeEnvironment.
> 2. There's also other components that the user might be interested in
> providing implementations of in embedded Standalone mode (i.e., not just in
> tests) - MetricsRegistry and JMXServer come to mind.
> 3. Most importantly, it's not clear to me who creates and manages the
> RuntimeEnvironment. It seems like it should be the ApplicationRunner or the
> user because of (2) above and because StreamManager also needs access to
> SystemAdmins for creating intermediate streams which users might want to
> mock. But it also needs to be passed down to the StreamProcessor - how
> would this work on Yarn?
>
> I think we should figure out how to integrate RuntimeEnvironment with
> ApplicationRunner before we can make a call on one vs. multiple
> ApplicationRunner implementations. If we do keep LocalApplicationRunner and
> RemoteApplication (and TestApplicationRunner) separate, agree with Jake
> that we should remove the JobRunners and roll them up into the respective
> ApplicationRunners.
>
> - Prateek
>
> On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes  wrote:
>
> > Thanks for the SEP!
> >
> > +1 on introducing these new components
> > -1 on the current definition of their roles (see Design feedback below)
> >
> > *Design*
> >
> >- If LocalJobRunner and RemoteJobRunner handle the different methods
> of
> >launching a Job, what additional value do the different types of
> >

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-24 Thread Prateek Maheshwari
Thanks for putting this together Yi!

I agree with Jake, it does seem like there are a few too many moving parts
here. That said, the problem being solved is pretty broad, so let me try to
summarize my current understanding of the requirements. Please correct me
if I'm wrong or missing something.

ApplicationRunner and JobRunner first, ignoring test environment for the
moment.
ApplicationRunner:
1. Create execution plan: Same in Standalone and Yarn
2. Create intermediate streams: Same logic but different leader election
(ZK-based or pre-configured in standalone, AM in Yarn).
3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.

JobRunner:
1. Run the StreamProcessors: Same process in Standalone & Test. Remote host
in Yarn.

To get a single ApplicationRunner implementation, like Jake suggested, we
need to make leader election and JobRunner implementation pluggable.
There's still the question of whether ApplicationRunner#run API should be
blocking or non-blocking. It has to be non-blocking in YARN. We want it to
be blocking in standalone, but seems like the main reason is ease of use
when launched from main(). I'd prefer making it consitently non-blocking
instead, esp. since in embedded standalone mode (where the processor is
running in another container) a blocking API would not be user-friendly
either. If not, we can add both run and runBlocking.

Coming to RuntimeEnvironment, which is the least clear to me so far:
1. I don't think RuntimeEnvironment should be responsible for providing
StreamSpecs for streamIds - they can be obtained with a config/util class.
The StreamProcessor should only know about logical streamIds and the
streamId <-> actual stream mapping should happen within the
SystemProducer/Consumer/Admins provided by the RuntimeEnvironment.
2. There's also other components that the user might be interested in
providing implementations of in embedded Standalone mode (i.e., not just in
tests) - MetricsRegistry and JMXServer come to mind.
3. Most importantly, it's not clear to me who creates and manages the
RuntimeEnvironment. It seems like it should be the ApplicationRunner or the
user because of (2) above and because StreamManager also needs access to
SystemAdmins for creating intermediate streams which users might want to
mock. But it also needs to be passed down to the StreamProcessor - how
would this work on Yarn?

I think we should figure out how to integrate RuntimeEnvironment with
ApplicationRunner before we can make a call on one vs. multiple
ApplicationRunner implementations. If we do keep LocalApplicationRunner and
RemoteApplication (and TestApplicationRunner) separate, agree with Jake
that we should remove the JobRunners and roll them up into the respective
ApplicationRunners.

- Prateek

On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes  wrote:

> Thanks for the SEP!
>
> +1 on introducing these new components
> -1 on the current definition of their roles (see Design feedback below)
>
> *Design*
>
>- If LocalJobRunner and RemoteJobRunner handle the different methods of
>launching a Job, what additional value do the different types of
>ApplicationRunner and RuntimeEnvironment provide? It seems like a red
> flag
>that all 3 would need to change from environment to environment. It
>indicates that they don't have proper modularity. The
> call-sequence-figures
>support this; LocalApplicationRunner and RemoteApplicationRunner make
> the
>same calls and the diagram only varies after jobRunner.start()
>- As far as I can tell, the only difference between Local and Remote
>ApplicationRunner is that one is blocking and the other is
> non-blocking. If
>that's all they're for then either the names should be changed to
> reflect
>this, or they should be combined into one ApplicationRunner and just
> expose
>separate methods for run() and runBlocking()
>- There isn't much detail on why the main() methods for Local/Remote
>have such different implementations, how they receive the Application
>(direct vs config), and concretely how the deployment scripts, if any,
>should interact with them.
>
>
> *Style*
>
>- nit: None of the 11 uses of the word "actual" in the doc are
> *actually*
>needed. :-)
>- nit: Colors of the runtime blocks in the diagrams are unconventional
>and a little distracting. Reminds me of nai won bao. Now I'm hungry. :-)
>- Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The
>term "execution environment" is used
>- The code comparisons for the ApplicationRunners are not apples-apples.
>The local runner example is an application that USES the local runner.
> The
>remote runner example is the just the runner code itself. So, it's not
>readily apparent that we're comparing the main() methods and not the
>application itself.
>
>
> On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan  wrote:
>
> > Made some updates to clarify the 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-21 Thread Chris Pettitt
I'm playing with ApplicationRunner, so I'll probably have more feedback.
For now, in addition to async run we also need async notification of
completion or failure. Also, ApplicationStatus should be able to give me
the cause of failure (e.g. via an Exception), not just a failure state.

On Thu, Apr 20, 2017 at 3:52 PM, Chris Pettitt 
wrote:

> It might be worth taking a look at how Beam does test streams. The API is
> more powerful than just passing in a queue, e.g.:
>
> TestStream source = TestStream.create(StringUtf8Coder.of())
> .addElements(TimestampedValue.of("this", start))
> .addElements(TimestampedValue.of("that", start))
> .addElements(TimestampedValue.of("future", 
> start.plus(Duration.standardMinutes(1
> .advanceProcessingTime(Duration.standardMinutes(3))
> .advanceWatermarkTo(start.plus(Duration.standardSeconds(30)))
> .advanceWatermarkTo(start.plus(Duration.standardMinutes(1)))
> .advanceWatermarkToInfinity();
>
> ---
>
> BTW, have we given up on the idea of a simpler input system, e.g. one that
> assumes all input messages are keyed? It seems it would be possible to
> support legacy "system streams" via an adapter that mapped K, V -> V' and
> could open the possibility of inputs in whatever for users want, e.g.
> (again from Beam):
>
> final Create.Values values = Create.of("test", "one", "two", "three");
>
> final TextIO.Read.Bound from = 
> TextIO.Read.from("src/main/resources/words.txt");
>
> final KafkaIO.Read reader = KafkaIO.read()
>
> .withBootstrapServers("myServer1:9092,myServer2:9092")
>
> .withTopics(topics)
>
> .withConsumerFactoryFn(new ConsumerFactoryFn(
>
> topics, 10, numElements, OffsetResetStrategy.EARLIEST))
>
> .withKeyCoder(BigEndianIntegerCoder.of())
>
> .withValueCoder(BigEndianLongCoder.of())
>
> .withMaxNumRecords(numElements);
> Ideally, such a simple input system specification would be useable in 
> production as well as test. At that point I don't know if we need a separate 
> TestApplicationRunner except perhaps as a hint to what we've been calling an 
> Environment?
>
> ---
>
> Aren't we supposed to be able to run applications without blocking (e.g.
> for embedded cases)? The API suggests that run is going to be a blocking
> call?
>
> - Chris
>
>
> On Thu, Apr 20, 2017 at 1:06 PM, Jacob Maes  wrote:
>
>> Thanks for the SEP!
>>
>> +1 on introducing these new components
>> -1 on the current definition of their roles (see Design feedback below)
>>
>> *Design*
>>
>>- If LocalJobRunner and RemoteJobRunner handle the different methods of
>>launching a Job, what additional value do the different types of
>>ApplicationRunner and RuntimeEnvironment provide? It seems like a red
>> flag
>>that all 3 would need to change from environment to environment. It
>>indicates that they don't have proper modularity. The
>> call-sequence-figures
>>support this; LocalApplicationRunner and RemoteApplicationRunner make
>> the
>>same calls and the diagram only varies after jobRunner.start()
>>- As far as I can tell, the only difference between Local and Remote
>>ApplicationRunner is that one is blocking and the other is
>> non-blocking. If
>>that's all they're for then either the names should be changed to
>> reflect
>>this, or they should be combined into one ApplicationRunner and just
>> expose
>>separate methods for run() and runBlocking()
>>- There isn't much detail on why the main() methods for Local/Remote
>>have such different implementations, how they receive the Application
>>(direct vs config), and concretely how the deployment scripts, if any,
>>should interact with them.
>>
>>
>> *Style*
>>
>>- nit: None of the 11 uses of the word "actual" in the doc are
>> *actually*
>>needed. :-)
>>- nit: Colors of the runtime blocks in the diagrams are unconventional
>>and a little distracting. Reminds me of nai won bao. Now I'm hungry.
>> :-)
>>- Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The
>>term "execution environment" is used
>>- The code comparisons for the ApplicationRunners are not
>> apples-apples.
>>The local runner example is an application that USES the local runner.
>> The
>>remote runner example is the just the runner code itself. So, it's not
>>readily apparent that we're comparing the main() methods and not the
>>application itself.
>>
>>
>> On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan  wrote:
>>
>> > Made some updates to clarify the role and functions of
>> RuntimeEnvironment
>> > in SEP-2.
>> >
>> > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan  wrote:
>> >
>> > > Hi, everyone,
>> > >
>> > > In light of new features such as fluent API and standalone that
>> introduce
>> > > new deployment / application launch models in Samza, I created a new
>> > 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-21 Thread Navina Ramesh
Hey Yi,
Thanks for lot for your work on this document. I know it must have been
crazy trying to put-together everything in a single doc :)

Here are my comments. Sorry about the delay :(

1. It will be useful to set some background for the benefit of the
community members who haven't been following design docs in the JIRAs. Can
you briefly explain the definition of StreamApplication and how it
translates to jobs through the stack.

2. "Problem" section doesn't seem to describe any problem that
ApplicationRunner is solving :) Imo, ApplicationRunner basically provides a
unified programming pattern for the user to execute StreamApplications
defined using fluent-api or task-level API. I think the problem and
motivation section can use a little bit of re-wording.

3. In the "Overview of ApplicationRunner" section:
* How the components within ApplicationRunner interact isn't very obvious
from the overview image. For example, ExecutionPlanner translates a
"StreamApplication" into an "ExecutionPlan" which is essentially a
specification of the DAG. (Please correct me, if I am wrong here!). The
ExecutionPlan is used by the JobRunner to launch Samza jobs.
* The roles of ExecutionPlanner and JobRunner are fairly well-defined.
StreamManager seems like a util class that helps class-load systems and
create streams. The ExecutionPlan will be consumed by JobRunner and
JobRunner will use StreamManager to create intermediate streams, prior to
launching jobs. It doesn't sound like a StreamManager is a "component" of
the ApplicationRunner.
* What is the role of the RuntimeEnvironment? That has not been explained.
Maybe explaining that will fill the gap in understanding for the readers. I
see that you have tried to explain the flow of control in the code using
the sequence diagram. Perhaps, if we can articulate the
roles/responsibilities of the RuntimeEnvironment, there will not be a need
for the control flow diagram.

4. How is runtime environment defined by the user? Is it configurable ?
Answering these questions in the doc will be useful

5. In the "Interaction between RuntimeEnvironment and ApplicationRunners"
section:
* Samza container is interacting with the RuntimeEnvironment. Does that
make the RuntimeEnvironment as a shared component between the
LocalApplicationRunner and the SamzaContainer? It doesn't seem to be the
case for RemoteApplicationRunner. So, I am confused as to why it is
different.

6. In general, what does "app.class" config represent?  It seems
straightforward when a "StreamApplication" is defined. Is it applicable
when using low-level task api?

7. Interface defintions:
* Perhaps when you implement this, can you specifically callout if each
method is blocking or not in the javadoc ?

8. Minor nit-picking:
* "ApplicationRunners in Different Execution Environments" -> should it be
RuntimeEnvironments as that is the terminology used in the rest of the
document.
* In the "How this works in standalone deployment" section:
* "Deploy the application to standalone hosts" and *Run run-local-app.sh on
each node to start/stop the local application* are probably just a single
step - Deploy the application to standalone hosts using run-local-app.sh??


General question:
It seems like, even with extensive changes to the interfaces/programming
model, we are still class loading the components for most parts. In such a
world, we are not close to integrating with frameworks that already have a
lifecycle model and can provide instantiated objects directly. For example,
in the Samza as a library use-case, it makes sense for the user to provide
a JmxServer or a taskFactory or a custom metricReporter for the
StreamProcessor. One of the motivations for this case was that most
applications are already running within a servlet/jetty container model
with its own lifecycle. If ApplicationRunner(s) is the unified interface,
doesn't that prohibit Samza from being integrated with such frameworks?

Thanks!
Navina

On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes  wrote:

> Thanks for the SEP!
>
> +1 on introducing these new components
> -1 on the current definition of their roles (see Design feedback below)
>
> *Design*
>
>- If LocalJobRunner and RemoteJobRunner handle the different methods of
>launching a Job, what additional value do the different types of
>ApplicationRunner and RuntimeEnvironment provide? It seems like a red
> flag
>that all 3 would need to change from environment to environment. It
>indicates that they don't have proper modularity. The
> call-sequence-figures
>support this; LocalApplicationRunner and RemoteApplicationRunner make
> the
>same calls and the diagram only varies after jobRunner.start()
>- As far as I can tell, the only difference between Local and Remote
>ApplicationRunner is that one is blocking and the other is
> non-blocking. If
>that's all they're for then either the names should be changed to
> reflect
>this, or they should be 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-20 Thread Chris Pettitt
It might be worth taking a look at how Beam does test streams. The API is
more powerful than just passing in a queue, e.g.:

TestStream source = TestStream.create(StringUtf8Coder.of())
.addElements(TimestampedValue.of("this", start))
.addElements(TimestampedValue.of("that", start))
.addElements(TimestampedValue.of("future",
start.plus(Duration.standardMinutes(1
.advanceProcessingTime(Duration.standardMinutes(3))
.advanceWatermarkTo(start.plus(Duration.standardSeconds(30)))
.advanceWatermarkTo(start.plus(Duration.standardMinutes(1)))
.advanceWatermarkToInfinity();

---

BTW, have we given up on the idea of a simpler input system, e.g. one that
assumes all input messages are keyed? It seems it would be possible to
support legacy "system streams" via an adapter that mapped K, V -> V' and
could open the possibility of inputs in whatever for users want, e.g.
(again from Beam):

final Create.Values values = Create.of("test", "one", "two", "three");

final TextIO.Read.Bound from = TextIO.Read.from("src/main/resources/words.txt");

final KafkaIO.Read reader = KafkaIO.read()

.withBootstrapServers("myServer1:9092,myServer2:9092")

.withTopics(topics)

.withConsumerFactoryFn(new ConsumerFactoryFn(

topics, 10, numElements, OffsetResetStrategy.EARLIEST))

.withKeyCoder(BigEndianIntegerCoder.of())

.withValueCoder(BigEndianLongCoder.of())

.withMaxNumRecords(numElements);
Ideally, such a simple input system specification would be useable in
production as well as test. At that point I don't know if we need a
separate TestApplicationRunner except perhaps as a hint to what we've
been calling an Environment?

---

Aren't we supposed to be able to run applications without blocking (e.g.
for embedded cases)? The API suggests that run is going to be a blocking
call?

- Chris


On Thu, Apr 20, 2017 at 1:06 PM, Jacob Maes  wrote:

> Thanks for the SEP!
>
> +1 on introducing these new components
> -1 on the current definition of their roles (see Design feedback below)
>
> *Design*
>
>- If LocalJobRunner and RemoteJobRunner handle the different methods of
>launching a Job, what additional value do the different types of
>ApplicationRunner and RuntimeEnvironment provide? It seems like a red
> flag
>that all 3 would need to change from environment to environment. It
>indicates that they don't have proper modularity. The
> call-sequence-figures
>support this; LocalApplicationRunner and RemoteApplicationRunner make
> the
>same calls and the diagram only varies after jobRunner.start()
>- As far as I can tell, the only difference between Local and Remote
>ApplicationRunner is that one is blocking and the other is
> non-blocking. If
>that's all they're for then either the names should be changed to
> reflect
>this, or they should be combined into one ApplicationRunner and just
> expose
>separate methods for run() and runBlocking()
>- There isn't much detail on why the main() methods for Local/Remote
>have such different implementations, how they receive the Application
>(direct vs config), and concretely how the deployment scripts, if any,
>should interact with them.
>
>
> *Style*
>
>- nit: None of the 11 uses of the word "actual" in the doc are
> *actually*
>needed. :-)
>- nit: Colors of the runtime blocks in the diagrams are unconventional
>and a little distracting. Reminds me of nai won bao. Now I'm hungry. :-)
>- Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The
>term "execution environment" is used
>- The code comparisons for the ApplicationRunners are not apples-apples.
>The local runner example is an application that USES the local runner.
> The
>remote runner example is the just the runner code itself. So, it's not
>readily apparent that we're comparing the main() methods and not the
>application itself.
>
>
> On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan  wrote:
>
> > Made some updates to clarify the role and functions of RuntimeEnvironment
> > in SEP-2.
> >
> > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan  wrote:
> >
> > > Hi, everyone,
> > >
> > > In light of new features such as fluent API and standalone that
> introduce
> > > new deployment / application launch models in Samza, I created a new
> > SEP-2
> > > to address the new use cases. SEP-2 link: https://cwiki.apache.
> > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design
> > >
> > > Please take a look and give feedbacks!
> > >
> > > Thanks!
> > >
> > > -Yi
> > >
> >
>


Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-20 Thread Jacob Maes
Thanks for the SEP!

+1 on introducing these new components
-1 on the current definition of their roles (see Design feedback below)

*Design*

   - If LocalJobRunner and RemoteJobRunner handle the different methods of
   launching a Job, what additional value do the different types of
   ApplicationRunner and RuntimeEnvironment provide? It seems like a red flag
   that all 3 would need to change from environment to environment. It
   indicates that they don't have proper modularity. The call-sequence-figures
   support this; LocalApplicationRunner and RemoteApplicationRunner make the
   same calls and the diagram only varies after jobRunner.start()
   - As far as I can tell, the only difference between Local and Remote
   ApplicationRunner is that one is blocking and the other is non-blocking. If
   that's all they're for then either the names should be changed to reflect
   this, or they should be combined into one ApplicationRunner and just expose
   separate methods for run() and runBlocking()
   - There isn't much detail on why the main() methods for Local/Remote
   have such different implementations, how they receive the Application
   (direct vs config), and concretely how the deployment scripts, if any,
   should interact with them.


*Style*

   - nit: None of the 11 uses of the word "actual" in the doc are *actually*
   needed. :-)
   - nit: Colors of the runtime blocks in the diagrams are unconventional
   and a little distracting. Reminds me of nai won bao. Now I'm hungry. :-)
   - Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The
   term "execution environment" is used
   - The code comparisons for the ApplicationRunners are not apples-apples.
   The local runner example is an application that USES the local runner. The
   remote runner example is the just the runner code itself. So, it's not
   readily apparent that we're comparing the main() methods and not the
   application itself.


On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan  wrote:

> Made some updates to clarify the role and functions of RuntimeEnvironment
> in SEP-2.
>
> On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan  wrote:
>
> > Hi, everyone,
> >
> > In light of new features such as fluent API and standalone that introduce
> > new deployment / application launch models in Samza, I created a new
> SEP-2
> > to address the new use cases. SEP-2 link: https://cwiki.apache.
> > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design
> >
> > Please take a look and give feedbacks!
> >
> > Thanks!
> >
> > -Yi
> >
>


Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-17 Thread Yi Pan
Made some updates to clarify the role and functions of RuntimeEnvironment
in SEP-2.

On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan  wrote:

> Hi, everyone,
>
> In light of new features such as fluent API and standalone that introduce
> new deployment / application launch models in Samza, I created a new SEP-2
> to address the new use cases. SEP-2 link: https://cwiki.apache.
> org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design
>
> Please take a look and give feedbacks!
>
> Thanks!
>
> -Yi
>


[DISCUSS] SEP-2: ApplicationRunner Design

2017-04-14 Thread Yi Pan
Hi, everyone,

In light of new features such as fluent API and standalone that introduce
new deployment / application launch models in Samza, I created a new SEP-2
to address the new use cases. SEP-2 link:
https://cwiki.apache.org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design

Please take a look and give feedbacks!

Thanks!

-Yi