Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-28 Thread xinyu liu
Right, option #2 seems redundant for defining streams after further
discussion here. StreamSpec itself is flexible enough to achieve both
static and programmatic specification of the stream. Agree it's not
convenient for now (pretty obvious after looking at your bsr
beam.runners.samza.wrapper), and we should provide similar predefined
convenient wrappers for user to create the StreamSpec. In your case
something like BoundedStreamSpec.file() which will generate the system
and serialize the data as you did.

We're still thinking the callback proposed in #2 can be useful for
requirement #6: injecting other user objects in run time, such as stores
and metrics. To simplify the user understanding further, I think we might
hide the ApplicationRunner and expose the StreamApplication instead, which
will make requirement #3 not user facing. So the API becomes like:

  StreamApplication app = StreamApplication.local(config)
.init (env -> {
   env.registerStore("my-store", new MyStoreFactory());
   env.registerMetricsReporter("my-reporte", new
MyMetricsReporterFactory());
})
.withLifeCycleListener(myListener);

  app.input(BoundedStreamSpec.create("/sample/input.txt"))
.map(...)
.window(...)

  app.run();

For requirement #5, I add a .withLifeCycleListener() in the API, which can
trigger the callbacks with life cycle events.

For #4: distribution of the jars will be what we have today using the Yarn
localization with a remote store like artifactory or http server. We
discussed where to put the graph serialization. The current thinking is to
define a general interface which can backed by a remote store, like Kafka,
artifactory or http server. For Kafka, it's straightforward but we will
have the size limit or cut it by ourselves. For the other two, we need to
investigate whether we can easily upload jars to our artifactory and
localizing it with Yarn. Any opinions on this?

Thanks,
Xinyu

On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt <
cpett...@linkedin.com.invalid> wrote:

> Your proposal for #1 looks good.
>
> I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
> the stream spec straight onto the runner while in #2 you do it in a
> callback. If it is either-or, #1 looks a lot better for my purposes.
>
> For #4 what mechanism are you using to distribute the JARs? Can you use the
> same mechanism to distribute the serialized graph?
>
> On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu  wrote:
>
> > btw, I will get to SAMZA-1246 as soon as possible.
> >
> > Thanks,
> > Xinyu
> >
> > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu 
> wrote:
> >
> > > Let me try to capture the updated requirements:
> > >
> > > 1. Set up input streams outside StreamGraph, and treat graph building
> as
> > a
> > > library (*Fluent, Beam*).
> > >
> > > 2. Improve ease of use for ApplicationRunner to avoid complex
> > > configurations such as zkCoordinator, zkCoordinationService.
> > (*Standalone*).
> > > Provide some programmatic way to tweak them in the API.
> > >
> > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We
> can
> > > have one or more implementations but it's hidden from the users.
> > >
> > > 4. Separate StreamGraph from runtime environment so it can be
> serialized
> > (*Beam,
> > > Yarn*)
> > >
> > > 5. Better life cycle management of application, parity with
> > > StreamProcessor (*Standalone, Beam*). Stats should include exception in
> > > case of failure (tracked in SAMZA-1246).
> > >
> > > 6. Support injecting user-defined objects into ApplicationRunner.
> > >
> > > Prateek and I iterate on the ApplilcationRunner API based on these
> > > requirements. To support #1, we can set up input streams on the runner
> > > level, which returns the MessageStream and allows graph building
> > > afterwards. The code looks like below:
> > >
> > >   ApplicationRunner runner = ApplicationRunner.local();
> > >   runner.input(streamSpec)
> > > .map(..)
> > > .window(...)
> > >   runner.run();
> > >
> > > StreamSpec is the building block for setting up streams here. It can be
> > > set up in different ways:
> > >
> > >   - Direct creation of stream spec, like runner.input(new
> StreamSpec(id,
> > > system, stream))
> > >   - Load from streamId from env or config, like
> > runner.input(runner.env().
> > > getStreamSpec(id))
> > >   - Canned Spec which generates the StreamSpec with id, system and
> stream
> > > to minimize the configuration. For example, CollectionSpec.create(new
> > > ArrayList[]{1,2,3,4}), which will auto generate the system and stream
> in
> > > the spec.
> > >
> > > To support #2, we need to be able to set up StreamSpec-related objects
> > and
> > > factories programmatically in env. Suppose we have the following before
> > > runner.input(...):
> > >
> > >   runner.setup(env /* a writable interface of env*/ -> {
> > > env.setStreamSpec(streamId, streamSpec);
> > > env.setSystem(systemName, systemFactory);
> > >   })
> >

Re: Review Request 58866: fixed SAMZA-1248. use processor id for stand alone barrier

2017-04-28 Thread Navina Ramesh via Review Board

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/58866/#review173409
---




samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
Line 65 (original), 65 (patched)


Why is newJobModel useful? Please add some comments as it is not very 
obvious.


- Navina Ramesh


On April 28, 2017, 11:54 p.m., Boris Shkolnik wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/58866/
> ---
> 
> (Updated April 28, 2017, 11:54 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-1248
> https://issues.apache.org/jira/browse/SAMZA-1248
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> use processor id for stand alone barrier
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
> 2535654cee37feeb472517b8673a7bb12b3cc1fc 
>   samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
> fee840511fbc19da2e19525a97fcfb5812a70a53 
>   samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
> b8dc2953ead2fb11fa22db5ec30b19a74a779830 
> 
> 
> Diff: https://reviews.apache.org/r/58866/diff/1/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Boris Shkolnik
> 
>



Re: Review Request 58866: fixed SAMZA-1248. use processor id for stand alone barrier

2017-04-28 Thread Boris Shkolnik

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/58866/
---

(Updated April 28, 2017, 11:54 p.m.)


Review request for samza.


Bugs: SAMZA-1248
https://issues.apache.org/jira/browse/SAMZA-1248


Repository: samza


Description (updated)
---

use processor id for stand alone barrier


Diffs
-

  samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
2535654cee37feeb472517b8673a7bb12b3cc1fc 
  samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
fee840511fbc19da2e19525a97fcfb5812a70a53 
  samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b8dc2953ead2fb11fa22db5ec30b19a74a779830 


Diff: https://reviews.apache.org/r/58866/diff/1/


Testing
---


Thanks,

Boris Shkolnik



Review Request 58866: fixed SAMZA-1248. use processor id for stand alone barrier

2017-04-28 Thread Boris Shkolnik

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/58866/
---

Review request for samza.


Bugs: SAMZA-1248
https://issues.apache.org/jira/browse/SAMZA-1248


Repository: samza


Description
---

removed unused variable


Diffs
-

  samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
2535654cee37feeb472517b8673a7bb12b3cc1fc 
  samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
fee840511fbc19da2e19525a97fcfb5812a70a53 
  samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b8dc2953ead2fb11fa22db5ec30b19a74a779830 


Diff: https://reviews.apache.org/r/58866/diff/1/


Testing
---


Thanks,

Boris Shkolnik



Re: Review Request 58851: SAMZA-1212 - Refactor interaction between StreamProcessor, JobCoordinator and SamzaContainer

2017-04-28 Thread Navina Ramesh via Review Board

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/58851/
---

(Updated April 28, 2017, 6:50 p.m.)


Review request for samza and Prateek Maheshwari.


Bugs: SAMZA-1212
https://issues.apache.org/jira/browse/SAMZA-1212


Repository: samza


Description
---

(Same as PR - https://github.com/apache/samza/pull/148)
See SAMZA-1212 for motivation toward this refactoring.

Changes here are:
- Removed awaitStart (blocking) method in StreamProcessor, JobCoordinator and 
SamzaContainer
- Introduced SamzaContainerListener and JobCoordinatorListener interface 
implemented by StreamProcessor
- Introduced SamzaContainerStatus to handler failures and lifecycle using 
Listener interfaces


Diffs
-

  samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
af2ef6a0338a0f0ab015e615a5dc213941095801 
  
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
 7f7e1ede822cf16b78e6e753ebc083a17ebf2aca 
  
samza-core/src/main/java/org/apache/samza/processor/JobCoordinatorListener.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
 4af413a14aaa3976f45b0646a3feb745ea3f0e97 
  
samza-core/src/main/java/org/apache/samza/processor/SamzaContainerListener.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
191059443e3d65869207a5f1e11526f97833f468 
  
samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
 7bca074a4d83bb9bc2434b6769ecf39c5694e2f9 
  samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
80350dfc02b577faf0dce00cf5695c23d202ad9c 
  
samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 0d74fb82590ba6f183905c9b0328b16d88adc0ab 
  
samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
 0faeca917aa5fb12acef9fb539d81a01255a0441 
  samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java 
0afd840dc2083dc78b853423f27776d6b5a2538f 
  samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java 
61f78762a3a1a50687ec00f783685f53d17bd645 
  samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
2535654cee37feeb472517b8673a7bb12b3cc1fc 
  samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
a44565c083dc73b0f5d56174d82e9ae62136cf02 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
8481c92b5666710edd8381526f824daed4dd27c5 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
dcef3af45bf5fe139be7744276adaddac3fb3505 
  samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
010ff7e85ff1c5e507f3e9fa7d6c196b58d929ab 
  
samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
 PRE-CREATION 
  
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
 a786468722cc49b4b6c3c67d89a6b09f1be4c939 
  
samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
 f37a224f64eec162e60e3a891b257175dbf4ec3c 
  
samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 29fb6d3f6e07f356d4a25556221fa76ecdc7bf77 


Diff: https://reviews.apache.org/r/58851/diff/1/


Testing
---

unit tests and ./gradlew clean build


Thanks,

Navina Ramesh



Review Request 58851: SAMZA-1212 - Refactor interaction between StreamProcessor, JobCoordinator and SamzaContainer

2017-04-28 Thread Navina Ramesh via Review Board

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/58851/
---

Review request for samza and Prateek Maheshwari.


Repository: samza


Description
---

(Same as PR - https://github.com/apache/samza/pull/148)
See SAMZA-1212 for motivation toward this refactoring.

Changes here are:
- Removed awaitStart (blocking) method in StreamProcessor, JobCoordinator and 
SamzaContainer
- Introduced SamzaContainerListener and JobCoordinatorListener interface 
implemented by StreamProcessor
- Introduced SamzaContainerStatus to handler failures and lifecycle using 
Listener interfaces


Diffs
-

  samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
af2ef6a0338a0f0ab015e615a5dc213941095801 
  
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
 7f7e1ede822cf16b78e6e753ebc083a17ebf2aca 
  
samza-core/src/main/java/org/apache/samza/processor/JobCoordinatorListener.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
 4af413a14aaa3976f45b0646a3feb745ea3f0e97 
  
samza-core/src/main/java/org/apache/samza/processor/SamzaContainerListener.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
191059443e3d65869207a5f1e11526f97833f468 
  
samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
 7bca074a4d83bb9bc2434b6769ecf39c5694e2f9 
  samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
80350dfc02b577faf0dce00cf5695c23d202ad9c 
  
samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 0d74fb82590ba6f183905c9b0328b16d88adc0ab 
  
samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
 0faeca917aa5fb12acef9fb539d81a01255a0441 
  samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java 
0afd840dc2083dc78b853423f27776d6b5a2538f 
  samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java 
61f78762a3a1a50687ec00f783685f53d17bd645 
  samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
2535654cee37feeb472517b8673a7bb12b3cc1fc 
  samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
a44565c083dc73b0f5d56174d82e9ae62136cf02 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
8481c92b5666710edd8381526f824daed4dd27c5 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
dcef3af45bf5fe139be7744276adaddac3fb3505 
  samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
010ff7e85ff1c5e507f3e9fa7d6c196b58d929ab 
  
samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
 PRE-CREATION 
  
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
 a786468722cc49b4b6c3c67d89a6b09f1be4c939 
  
samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
 f37a224f64eec162e60e3a891b257175dbf4ec3c 
  
samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 29fb6d3f6e07f356d4a25556221fa76ecdc7bf77 


Diff: https://reviews.apache.org/r/58851/diff/1/


Testing
---

unit tests and ./gradlew clean build


Thanks,

Navina Ramesh



Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-28 Thread Chris Pettitt
Your proposal for #1 looks good.

I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
the stream spec straight onto the runner while in #2 you do it in a
callback. If it is either-or, #1 looks a lot better for my purposes.

For #4 what mechanism are you using to distribute the JARs? Can you use the
same mechanism to distribute the serialized graph?

On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu  wrote:

> btw, I will get to SAMZA-1246 as soon as possible.
>
> Thanks,
> Xinyu
>
> On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu  wrote:
>
> > Let me try to capture the updated requirements:
> >
> > 1. Set up input streams outside StreamGraph, and treat graph building as
> a
> > library (*Fluent, Beam*).
> >
> > 2. Improve ease of use for ApplicationRunner to avoid complex
> > configurations such as zkCoordinator, zkCoordinationService.
> (*Standalone*).
> > Provide some programmatic way to tweak them in the API.
> >
> > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
> > have one or more implementations but it's hidden from the users.
> >
> > 4. Separate StreamGraph from runtime environment so it can be serialized
> (*Beam,
> > Yarn*)
> >
> > 5. Better life cycle management of application, parity with
> > StreamProcessor (*Standalone, Beam*). Stats should include exception in
> > case of failure (tracked in SAMZA-1246).
> >
> > 6. Support injecting user-defined objects into ApplicationRunner.
> >
> > Prateek and I iterate on the ApplilcationRunner API based on these
> > requirements. To support #1, we can set up input streams on the runner
> > level, which returns the MessageStream and allows graph building
> > afterwards. The code looks like below:
> >
> >   ApplicationRunner runner = ApplicationRunner.local();
> >   runner.input(streamSpec)
> > .map(..)
> > .window(...)
> >   runner.run();
> >
> > StreamSpec is the building block for setting up streams here. It can be
> > set up in different ways:
> >
> >   - Direct creation of stream spec, like runner.input(new StreamSpec(id,
> > system, stream))
> >   - Load from streamId from env or config, like
> runner.input(runner.env().
> > getStreamSpec(id))
> >   - Canned Spec which generates the StreamSpec with id, system and stream
> > to minimize the configuration. For example, CollectionSpec.create(new
> > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
> > the spec.
> >
> > To support #2, we need to be able to set up StreamSpec-related objects
> and
> > factories programmatically in env. Suppose we have the following before
> > runner.input(...):
> >
> >   runner.setup(env /* a writable interface of env*/ -> {
> > env.setStreamSpec(streamId, streamSpec);
> > env.setSystem(systemName, systemFactory);
> >   })
> >
> > runner.setup(->) also provides setup for stores and other runtime stuff
> > needed for the execution. The setup should be able to serialized to
> config.
> > For #6, I haven't figured out a good way to inject user-defined objects
> > here yet.
> >
> > With this API, we should be able to also support #4. For remote
> > runner.run(), the operator user classes/lamdas in the StreamGraph need to
> > be serialized. As today, the existing option is to serialize to a stream,
> > either the coordinator stream or the pipeline control stream, which will
> > have the size limit per message. Do you see RPC as an option?
> >
> > For this version of API, seems we don't need the StreamApplication
> wrapper
> > as well as exposing the StreamGraph. Do you think we are on the right
> path?
> >
> > Thanks,
> > Xinyu
> >
> >
> > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> > cpett...@linkedin.com.invalid> wrote:
> >
> >> That should have been:
> >>
> >> For #1, Beam doesn't have a hard requirement...
> >>
> >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt 
> >> wrote:
> >>
> >> > For #1, I doesn't have a hard requirement for any change from Samza. A
> >> > very nice to have would be to allow the input systems to be set up at
> >> the
> >> > same time as the rest of the StreamGraph. An even nicer to have would
> >> be to
> >> > do away with the callback based approach and treat graph building as a
> >> > library, a la Beam and Flink.
> >> >
> >> > For the moment I've worked around the two pass requirement (once for
> >> > config, once for StreamGraph) by introducing an IR layer between Beam
> >> and
> >> > the Samza Fluent translation. The IR layer is convenient independent
> of
> >> > this problem because it makes it easier to switch between the Fluent
> and
> >> > low-level APIs.
> >> >
> >> >
> >> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> >> > great shape. One additional issue with the status call that I may not
> >> have
> >> > mentioned is that it provides you no way to get at the cause of
> failure.
> >> > The StreamProcessor API does allow this via the callback.
> >> >
> >> >
> >> > Re. #2 and #3, I'm a big fan of getting rid of 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-28 Thread Prateek Maheshwari
Thanks for summarizing and looping me back into the discussion Xinyu.
Apparently I've been missing some emails from this list at my work address
and some of my replies from personal email aren't being delivered either.

Agree that we should replace the callback-based graph building in
StreamApplication with a library-like model. The callback based approach
made sense for the low-level fluent API, but the library based approach is
much cleaner for a high-level API.

Agree that we should serialize the logical graph and user-defined functions
and distribute them to the containers for execution. This will also give us
an intermediate representation to convert beam/sql plans to. I'd prefer a
clear REST/RPC API b/w the leader and follower JobCoordinators for
exchanging configs/models/graphs instead of putting them in configs as
well.

Agree that we should try to make the ApplicationRunner lifecycle similar to
StreamProcessor's (async + callbacks). Xinyu and Jake have a better idea
about whether this can be reasonably done for both local and remote
execution.

Agree that we should separate the runtime environment creation from the
logical graph description. This will help with graph serialization, testing
and lifecycle management of user-provided objects. Also agree that
environment creation logic should be serializable.

There are several pluggable components (and their configs) in Samza, so a
clean and user-friendly API for environment creation would be my highest
priority. My feeling is that convenience APIs for #2 and #3 will be much
easier to create once we get the base environment API right.

Similarly, I'd also like to get a better sense of how the environment
(including systems) will be set-up by users and wired internally before
working on different ways of describing the inputs/specs.

Thanks,
Prateek

On Thu, Apr 27, 2017 at 9:14 PM, xinyu liu  wrote:

> btw, I will get to SAMZA-1246 as soon as possible.
>
> Thanks,
> Xinyu
>
> On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu  wrote:
>
> > Let me try to capture the updated requirements:
> >
> > 1. Set up input streams outside StreamGraph, and treat graph building as
> a
> > library (*Fluent, Beam*).
> >
> > 2. Improve ease of use for ApplicationRunner to avoid complex
> > configurations such as zkCoordinator, zkCoordinationService.
> (*Standalone*).
> > Provide some programmatic way to tweak them in the API.
> >
> > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
> > have one or more implementations but it's hidden from the users.
> >
> > 4. Separate StreamGraph from runtime environment so it can be serialized
> (*Beam,
> > Yarn*)
> >
> > 5. Better life cycle management of application, parity with
> > StreamProcessor (*Standalone, Beam*). Stats should include exception in
> > case of failure (tracked in SAMZA-1246).
> >
> > 6. Support injecting user-defined objects into ApplicationRunner.
> >
> > Prateek and I iterate on the ApplilcationRunner API based on these
> > requirements. To support #1, we can set up input streams on the runner
> > level, which returns the MessageStream and allows graph building
> > afterwards. The code looks like below:
> >
> >   ApplicationRunner runner = ApplicationRunner.local();
> >   runner.input(streamSpec)
> > .map(..)
> > .window(...)
> >   runner.run();
> >
> > StreamSpec is the building block for setting up streams here. It can be
> > set up in different ways:
> >
> >   - Direct creation of stream spec, like runner.input(new StreamSpec(id,
> > system, stream))
> >   - Load from streamId from env or config, like
> runner.input(runner.env().
> > getStreamSpec(id))
> >   - Canned Spec which generates the StreamSpec with id, system and stream
> > to minimize the configuration. For example, CollectionSpec.create(new
> > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
> > the spec.
> >
> > To support #2, we need to be able to set up StreamSpec-related objects
> and
> > factories programmatically in env. Suppose we have the following before
> > runner.input(...):
> >
> >   runner.setup(env /* a writable interface of env*/ -> {
> > env.setStreamSpec(streamId, streamSpec);
> > env.setSystem(systemName, systemFactory);
> >   })
> >
> > runner.setup(->) also provides setup for stores and other runtime stuff
> > needed for the execution. The setup should be able to serialized to
> config.
> > For #6, I haven't figured out a good way to inject user-defined objects
> > here yet.
> >
> > With this API, we should be able to also support #4. For remote
> > runner.run(), the operator user classes/lamdas in the StreamGraph need to
> > be serialized. As today, the existing option is to serialize to a stream,
> > either the coordinator stream or the pipeline control stream, which will
> > have the size limit per message. Do you see RPC as an option?
> >
> > For this version of API, seems we don't need the StreamApplication
> wrapper
> > as well as exposing the St

[GitHub] samza pull request #148: SAMZA-1212 - Refactor interaction between StreamPro...

2017-04-28 Thread navina
GitHub user navina opened a pull request:

https://github.com/apache/samza/pull/148

SAMZA-1212 - Refactor interaction between StreamProcessor, JobCoordinator 
and SamzaContainer

See SAMZA-1212 for motivation toward this refactoring.
Changes here are:
* Removed awaitStart (blocking) method in StreamProcessor, JobCoordinator 
and SamzaContainer
* Introduced SamzaContainerListener and JobCoordinatorListener interface 
implemented by StreamProcessor
* Introduced SamzaContainerStatus to handler failures and lifecycle using 
Listener interfaces

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/navina/samza SAMZA-1212

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/148.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #148


commit d81cfdca4b84f02b86ab70657c8c4636e8902b9a
Author: Navina Ramesh 
Date:   2017-04-14T00:20:38Z

Remove containerId from SamzaContainer.apply

commit c4a10242b6e85345ed4515b98ec407435c1fdce1
Author: Navina Ramesh 
Date:   2017-04-14T00:22:01Z

Removing onBecomingLeader for readability

commit 56028361552b37a27991c6cac1f3e00cc3d3a0f2
Author: Navina Ramesh 
Date:   2017-04-14T00:26:01Z

Removing awaitStart

commit fd99fd65fb437afed04240b3971b3cefc1f52f1d
Author: Navina Ramesh 
Date:   2017-04-14T19:12:46Z

Added JobCoordinator Listener. Trying to remove Samzacontainer controller

commit e77aa502df74cedb87c50a8e039135975504381e
Author: Navina Ramesh 
Date:   2017-04-25T01:36:14Z

Adding ProcessorErrorHandler, SamzaContainerStatus, JobCoordinatorListener, 
SamzaContainerListener

commit 3cbf259c1e9fea7a4d24af93a812e75d9947aac8
Author: Navina Ramesh 
Date:   2017-04-26T23:01:24Z

Documenting state transitions for SamzaContainer

commit 679b2f54aa7a39c8dae688f9b446aa9bad9d267f
Author: Navina Ramesh 
Date:   2017-04-26T23:03:00Z

adding some log lines in LocalContainerRunner

commit 3b65cc983d7d734d6fdf2a81cb155fbad0e774b3
Author: Navina Ramesh 
Date:   2017-04-27T01:36:25Z

Fixed integration test failures by throwing the exception in the listener 
for ThreadJob

commit b1b61f58b2e06a1e7f5fc602fe9007d4c1a003a0
Author: Navina Ramesh 
Date:   2017-04-27T19:42:54Z

Added a few tests in TestSamzaContainer

commit a2db96924ebd479e2110fc611c86c3c310336212
Author: Navina Ramesh 
Date:   2017-04-27T23:38:29Z

Added test for sp.stop()

commit bc74cd5670aacfe5c4eae7968973e68f9f700876
Author: Navina Ramesh 
Date:   2017-04-28T00:23:32Z

Adding setContainerListener explicitly in SamzaContainer

commit 07adf3c6ce39a893a0995498bc012cf6c14c43be
Author: Navina Ramesh 
Date:   2017-04-28T02:38:24Z

Added documentation in JobCoordinator interface

commit 78a73540cc0cd84db286737b190c596dcde93d1f
Author: Navina Ramesh 
Date:   2017-04-28T02:43:00Z

Removed ProcessorErrorHandler

commit 5d1b28c6b566ca691a955d94bb1daf29a96737ef
Author: Navina Ramesh 
Date:   2017-04-28T02:45:47Z

Removing commetned out code

commit 42ffc7d6c1d5e657b35d9482df30f0e201bdbb27
Author: Navina Ramesh 
Date:   2017-04-28T05:00:40Z

Adding docs to JobCoordinatorListener

commit 5ff163cf19c85875a0e2a8d85682487186ffc6c5
Author: Navina Ramesh 
Date:   2017-04-28T05:36:02Z

Added javadocs for SamzaContainerListener

commit f3551656037a058aebd62e9f7dacaafeb49d2f94
Author: Navina Ramesh 
Date:   2017-04-28T06:33:49Z

Cleaning up StreamProcessor code and jobCoordinator docs

commit c624d75afd77dd028a4406d6e07d2ef801098b03
Author: Navina Ramesh 
Date:   2017-04-28T07:09:36Z

Fixing standaloneJobCoordinator

commit c116a3c55149a4cca738a66ec925569385568be9
Author: Navina Ramesh 
Date:   2017-04-28T07:14:04Z

Adding null checks on processorListener

commit 6f0715c4944255409bd78fd178c8e9976e60f485
Author: Navina Ramesh 
Date:   2017-04-28T07:32:37Z

Fixing ZkJobCoordinator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---