Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread xinyu liu
btw, I will get to SAMZA-1246 as soon as possible.

Thanks,
Xinyu

On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu  wrote:

> Let me try to capture the updated requirements:
>
> 1. Set up input streams outside StreamGraph, and treat graph building as a
> library (*Fluent, Beam*).
>
> 2. Improve ease of use for ApplicationRunner to avoid complex
> configurations such as zkCoordinator, zkCoordinationService. (*Standalone*).
> Provide some programmatic way to tweak them in the API.
>
> 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
> have one or more implementations but it's hidden from the users.
>
> 4. Separate StreamGraph from runtime environment so it can be serialized 
> (*Beam,
> Yarn*)
>
> 5. Better life cycle management of application, parity with
> StreamProcessor (*Standalone, Beam*). Stats should include exception in
> case of failure (tracked in SAMZA-1246).
>
> 6. Support injecting user-defined objects into ApplicationRunner.
>
> Prateek and I iterate on the ApplilcationRunner API based on these
> requirements. To support #1, we can set up input streams on the runner
> level, which returns the MessageStream and allows graph building
> afterwards. The code looks like below:
>
>   ApplicationRunner runner = ApplicationRunner.local();
>   runner.input(streamSpec)
> .map(..)
> .window(...)
>   runner.run();
>
> StreamSpec is the building block for setting up streams here. It can be
> set up in different ways:
>
>   - Direct creation of stream spec, like runner.input(new StreamSpec(id,
> system, stream))
>   - Load from streamId from env or config, like runner.input(runner.env().
> getStreamSpec(id))
>   - Canned Spec which generates the StreamSpec with id, system and stream
> to minimize the configuration. For example, CollectionSpec.create(new
> ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
> the spec.
>
> To support #2, we need to be able to set up StreamSpec-related objects and
> factories programmatically in env. Suppose we have the following before
> runner.input(...):
>
>   runner.setup(env /* a writable interface of env*/ -> {
> env.setStreamSpec(streamId, streamSpec);
> env.setSystem(systemName, systemFactory);
>   })
>
> runner.setup(->) also provides setup for stores and other runtime stuff
> needed for the execution. The setup should be able to serialized to config.
> For #6, I haven't figured out a good way to inject user-defined objects
> here yet.
>
> With this API, we should be able to also support #4. For remote
> runner.run(), the operator user classes/lamdas in the StreamGraph need to
> be serialized. As today, the existing option is to serialize to a stream,
> either the coordinator stream or the pipeline control stream, which will
> have the size limit per message. Do you see RPC as an option?
>
> For this version of API, seems we don't need the StreamApplication wrapper
> as well as exposing the StreamGraph. Do you think we are on the right path?
>
> Thanks,
> Xinyu
>
>
> On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
>> That should have been:
>>
>> For #1, Beam doesn't have a hard requirement...
>>
>> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt 
>> wrote:
>>
>> > For #1, I doesn't have a hard requirement for any change from Samza. A
>> > very nice to have would be to allow the input systems to be set up at
>> the
>> > same time as the rest of the StreamGraph. An even nicer to have would
>> be to
>> > do away with the callback based approach and treat graph building as a
>> > library, a la Beam and Flink.
>> >
>> > For the moment I've worked around the two pass requirement (once for
>> > config, once for StreamGraph) by introducing an IR layer between Beam
>> and
>> > the Samza Fluent translation. The IR layer is convenient independent of
>> > this problem because it makes it easier to switch between the Fluent and
>> > low-level APIs.
>> >
>> >
>> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
>> > great shape. One additional issue with the status call that I may not
>> have
>> > mentioned is that it provides you no way to get at the cause of failure.
>> > The StreamProcessor API does allow this via the callback.
>> >
>> >
>> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
>> > indirection you currently have to jump through (this is also related to
>> > system consumer configuration from #1. It makes it much easier to
>> discover
>> > what the configurable parameters are too, if we provide some
>> programmatic
>> > way to tweak them in the API - which can turn into config under the
>> hood.
>> >
>> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu 
>> wrote:
>> >
>> >> Let me give a shot to summarize the requirements for ApplicationRunner
>> we
>> >> have discussed so far:
>> >>
>> >> - Support environment for passing in user-defined 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread xinyu liu
Let me try to capture the updated requirements:

1. Set up input streams outside StreamGraph, and treat graph building as a
library (*Fluent, Beam*).

2. Improve ease of use for ApplicationRunner to avoid complex
configurations such as zkCoordinator, zkCoordinationService. (*Standalone*).
Provide some programmatic way to tweak them in the API.

3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
have one or more implementations but it's hidden from the users.

4. Separate StreamGraph from runtime environment so it can be
serialized (*Beam,
Yarn*)

5. Better life cycle management of application, parity with StreamProcessor
(*Standalone, Beam*). Stats should include exception in case of failure
(tracked in SAMZA-1246).

6. Support injecting user-defined objects into ApplicationRunner.

Prateek and I iterate on the ApplilcationRunner API based on these
requirements. To support #1, we can set up input streams on the runner
level, which returns the MessageStream and allows graph building
afterwards. The code looks like below:

  ApplicationRunner runner = ApplicationRunner.local();
  runner.input(streamSpec)
.map(..)
.window(...)
  runner.run();

StreamSpec is the building block for setting up streams here. It can be set
up in different ways:

  - Direct creation of stream spec, like runner.input(new StreamSpec(id,
system, stream))
  - Load from streamId from env or config, like
runner.input(runner.env().getStreamSpec(id))
  - Canned Spec which generates the StreamSpec with id, system and stream
to minimize the configuration. For example, CollectionSpec.create(new
ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
the spec.

To support #2, we need to be able to set up StreamSpec-related objects and
factories programmatically in env. Suppose we have the following before
runner.input(...):

  runner.setup(env /* a writable interface of env*/ -> {
env.setStreamSpec(streamId, streamSpec);
env.setSystem(systemName, systemFactory);
  })

runner.setup(->) also provides setup for stores and other runtime stuff
needed for the execution. The setup should be able to serialized to config.
For #6, I haven't figured out a good way to inject user-defined objects
here yet.

With this API, we should be able to also support #4. For remote
runner.run(), the operator user classes/lamdas in the StreamGraph need to
be serialized. As today, the existing option is to serialize to a stream,
either the coordinator stream or the pipeline control stream, which will
have the size limit per message. Do you see RPC as an option?

For this version of API, seems we don't need the StreamApplication wrapper
as well as exposing the StreamGraph. Do you think we are on the right path?

Thanks,
Xinyu


On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
cpett...@linkedin.com.invalid> wrote:

> That should have been:
>
> For #1, Beam doesn't have a hard requirement...
>
> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt 
> wrote:
>
> > For #1, I doesn't have a hard requirement for any change from Samza. A
> > very nice to have would be to allow the input systems to be set up at the
> > same time as the rest of the StreamGraph. An even nicer to have would be
> to
> > do away with the callback based approach and treat graph building as a
> > library, a la Beam and Flink.
> >
> > For the moment I've worked around the two pass requirement (once for
> > config, once for StreamGraph) by introducing an IR layer between Beam and
> > the Samza Fluent translation. The IR layer is convenient independent of
> > this problem because it makes it easier to switch between the Fluent and
> > low-level APIs.
> >
> >
> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> > great shape. One additional issue with the status call that I may not
> have
> > mentioned is that it provides you no way to get at the cause of failure.
> > The StreamProcessor API does allow this via the callback.
> >
> >
> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
> > indirection you currently have to jump through (this is also related to
> > system consumer configuration from #1. It makes it much easier to
> discover
> > what the configurable parameters are too, if we provide some programmatic
> > way to tweak them in the API - which can turn into config under the hood.
> >
> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu 
> wrote:
> >
> >> Let me give a shot to summarize the requirements for ApplicationRunner
> we
> >> have discussed so far:
> >>
> >> - Support environment for passing in user-defined objects (streams
> >> potentially) into ApplicationRunner (*Beam*)
> >>
> >> - Improve ease of use for ApplicationRunner to avoid complex
> >> configurations
> >> such as zkCoordinator, zkCoordinationService. (*Standalone*)
> >>
> >> - Clean up ApplicationRunner into a single interface (*Fluent*). We can
> >> have one or more 

[GitHub] samza pull request #142: SAMZA-1219 Add metrics for operator message receive...

2017-04-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/142


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #147: Access log

2017-04-27 Thread jayasi
Github user jayasi closed the pull request at:

https://github.com/apache/samza/pull/147


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #147: Access log

2017-04-27 Thread jayasi
GitHub user jayasi opened a pull request:

https://github.com/apache/samza/pull/147

Access log



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/s-noghabi/samza access-log

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/147.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #147


commit 9a62a7138cab12941e7becb03cd4016fa7912ee7
Author: jmehar2 
Date:   2017-04-19T20:20:41Z

AccessLog for measuring storage access patterns and metrics

commit cdb61c58ed710e99bfaeab1a026e239e4b0d3a2a
Author: jmehar2 
Date:   2017-04-27T22:20:06Z

Refactor code and add logging of other DBOperations

commit f0bbe4fb2c607c81d154d09a9988712c6ea4e2f4
Author: jmehar2 
Date:   2017-04-27T22:26:33Z

Fixed imports




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #141: Samza 1214: Allow users to set a default replicatio...

2017-04-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/141


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #122: SAMZA-1210: Fixing merge issue and container id gen...

2017-04-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/122


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #146: SAMZA-1224 : Revert job coordinator factory config ...

2017-04-27 Thread navina
GitHub user navina opened a pull request:

https://github.com/apache/samza/pull/146

SAMZA-1224 : Revert job coordinator factory config to the old format

We didn't release since adding this config. So, it is ok to change the 
format now.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/navina/samza SAMZA-1224

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/146.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #146


commit 29119b09374383be0c6bae8d0f1b6e3cc9454d57
Author: nramesh 
Date:   2017-04-27T21:31:55Z

SAMZA-1224 : Revert job coordinator factory config to the old format




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #145: SAMZA-1245: Make stream samza.physical.name config ...

2017-04-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/145


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #145: SAMZA-1245: Make stream samza.physical.name config ...

2017-04-27 Thread xinyuiscool
GitHub user xinyuiscool opened a pull request:

https://github.com/apache/samza/pull/145

SAMZA-1245: Make stream samza.physical.name config name string public

For certain system such as hdfs, the physical stream name might need to be 
finalized during the config generation. In order to do that, we will need to 
expose the stream samza.physical.name config string.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xinyuiscool/samza SAMZA-1245

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/145.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #145


commit 6d7fd2e563f7e6501d54e1cc9d484e38c58a1659
Author: Xinyu Liu 
Date:   2017-04-27T18:36:46Z

SAMZA-1245: Make stream samza.physical.name config name string public




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread Chris Pettitt
That should have been:

For #1, Beam doesn't have a hard requirement...

On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt 
wrote:

> For #1, I doesn't have a hard requirement for any change from Samza. A
> very nice to have would be to allow the input systems to be set up at the
> same time as the rest of the StreamGraph. An even nicer to have would be to
> do away with the callback based approach and treat graph building as a
> library, a la Beam and Flink.
>
> For the moment I've worked around the two pass requirement (once for
> config, once for StreamGraph) by introducing an IR layer between Beam and
> the Samza Fluent translation. The IR layer is convenient independent of
> this problem because it makes it easier to switch between the Fluent and
> low-level APIs.
>
>
> For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> great shape. One additional issue with the status call that I may not have
> mentioned is that it provides you no way to get at the cause of failure.
> The StreamProcessor API does allow this via the callback.
>
>
> Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
> indirection you currently have to jump through (this is also related to
> system consumer configuration from #1. It makes it much easier to discover
> what the configurable parameters are too, if we provide some programmatic
> way to tweak them in the API - which can turn into config under the hood.
>
> On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu  wrote:
>
>> Let me give a shot to summarize the requirements for ApplicationRunner we
>> have discussed so far:
>>
>> - Support environment for passing in user-defined objects (streams
>> potentially) into ApplicationRunner (*Beam*)
>>
>> - Improve ease of use for ApplicationRunner to avoid complex
>> configurations
>> such as zkCoordinator, zkCoordinationService. (*Standalone*)
>>
>> - Clean up ApplicationRunner into a single interface (*Fluent*). We can
>> have one or more implementations but it's hidden from the users.
>>
>> - Separate StreamGraph from environment so it can be serializable (*Beam,
>> Yarn*)
>>
>> - Better life cycle management of application, including
>> start/stop/stats (*Standalone,
>> Beam*)
>>
>>
>> One way to address 2 and 3 is to provide pre-packaged runner using static
>> factory methods, and the return type will be the ApplicationRunner
>> interface. So we can have:
>>
>>   ApplicationRunner runner = ApplicationRunner.zk() /
>> ApplicationRunner.local()
>> / ApplicationRunner.remote() / ApplicationRunner.test().
>>
>> Internally we will package the right configs and run-time environment with
>> the runner. For example, ApplicationRunner.zk() will define all the
>> configs
>> needed for zk coordination.
>>
>> To support 1 and 4, can we pass in a lambda function in the runner, and
>> then we can run the stream graph? Like the following:
>>
>>   ApplicationRunner.zk().env(config -> environment).run(streamGraph);
>>
>> Then we need a way to pass the environment into the StreamGraph. This can
>> be done by either adding an extra parameter to each operator, or have a
>> getEnv() function in the MessageStream, which seems to be pretty hacky.
>>
>> What do you think?
>>
>> Thanks,
>> Xinyu
>>
>>
>>
>>
>>
>> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
>> pmaheshw...@linkedin.com.invalid> wrote:
>>
>> > Thanks for putting this together Yi!
>> >
>> > I agree with Jake, it does seem like there are a few too many moving
>> parts
>> > here. That said, the problem being solved is pretty broad, so let me
>> try to
>> > summarize my current understanding of the requirements. Please correct
>> me
>> > if I'm wrong or missing something.
>> >
>> > ApplicationRunner and JobRunner first, ignoring test environment for the
>> > moment.
>> > ApplicationRunner:
>> > 1. Create execution plan: Same in Standalone and Yarn
>> > 2. Create intermediate streams: Same logic but different leader election
>> > (ZK-based or pre-configured in standalone, AM in Yarn).
>> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
>> >
>> > JobRunner:
>> > 1. Run the StreamProcessors: Same process in Standalone & Test. Remote
>> host
>> > in Yarn.
>> >
>> > To get a single ApplicationRunner implementation, like Jake suggested,
>> we
>> > need to make leader election and JobRunner implementation pluggable.
>> > There's still the question of whether ApplicationRunner#run API should
>> be
>> > blocking or non-blocking. It has to be non-blocking in YARN. We want it
>> to
>> > be blocking in standalone, but seems like the main reason is ease of use
>> > when launched from main(). I'd prefer making it consitently non-blocking
>> > instead, esp. since in embedded standalone mode (where the processor is
>> > running in another container) a blocking API would not be user-friendly
>> > either. If not, we can add both run and runBlocking.
>> >
>> > Coming to RuntimeEnvironment, which is the least 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread Chris Pettitt
For #1, I doesn't have a hard requirement for any change from Samza. A very
nice to have would be to allow the input systems to be set up at the same
time as the rest of the StreamGraph. An even nicer to have would be to do
away with the callback based approach and treat graph building as a
library, a la Beam and Flink.

For the moment I've worked around the two pass requirement (once for
config, once for StreamGraph) by introducing an IR layer between Beam and
the Samza Fluent translation. The IR layer is convenient independent of
this problem because it makes it easier to switch between the Fluent and
low-level APIs.


For #4, if we had parity with StreamProcessor for lifecycle we'd be in
great shape. One additional issue with the status call that I may not have
mentioned is that it provides you no way to get at the cause of failure.
The StreamProcessor API does allow this via the callback.


Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
indirection you currently have to jump through (this is also related to
system consumer configuration from #1. It makes it much easier to discover
what the configurable parameters are too, if we provide some programmatic
way to tweak them in the API - which can turn into config under the hood.

On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu  wrote:

> Let me give a shot to summarize the requirements for ApplicationRunner we
> have discussed so far:
>
> - Support environment for passing in user-defined objects (streams
> potentially) into ApplicationRunner (*Beam*)
>
> - Improve ease of use for ApplicationRunner to avoid complex configurations
> such as zkCoordinator, zkCoordinationService. (*Standalone*)
>
> - Clean up ApplicationRunner into a single interface (*Fluent*). We can
> have one or more implementations but it's hidden from the users.
>
> - Separate StreamGraph from environment so it can be serializable (*Beam,
> Yarn*)
>
> - Better life cycle management of application, including
> start/stop/stats (*Standalone,
> Beam*)
>
>
> One way to address 2 and 3 is to provide pre-packaged runner using static
> factory methods, and the return type will be the ApplicationRunner
> interface. So we can have:
>
>   ApplicationRunner runner = ApplicationRunner.zk() /
> ApplicationRunner.local()
> / ApplicationRunner.remote() / ApplicationRunner.test().
>
> Internally we will package the right configs and run-time environment with
> the runner. For example, ApplicationRunner.zk() will define all the configs
> needed for zk coordination.
>
> To support 1 and 4, can we pass in a lambda function in the runner, and
> then we can run the stream graph? Like the following:
>
>   ApplicationRunner.zk().env(config -> environment).run(streamGraph);
>
> Then we need a way to pass the environment into the StreamGraph. This can
> be done by either adding an extra parameter to each operator, or have a
> getEnv() function in the MessageStream, which seems to be pretty hacky.
>
> What do you think?
>
> Thanks,
> Xinyu
>
>
>
>
>
> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
> pmaheshw...@linkedin.com.invalid> wrote:
>
> > Thanks for putting this together Yi!
> >
> > I agree with Jake, it does seem like there are a few too many moving
> parts
> > here. That said, the problem being solved is pretty broad, so let me try
> to
> > summarize my current understanding of the requirements. Please correct me
> > if I'm wrong or missing something.
> >
> > ApplicationRunner and JobRunner first, ignoring test environment for the
> > moment.
> > ApplicationRunner:
> > 1. Create execution plan: Same in Standalone and Yarn
> > 2. Create intermediate streams: Same logic but different leader election
> > (ZK-based or pre-configured in standalone, AM in Yarn).
> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
> >
> > JobRunner:
> > 1. Run the StreamProcessors: Same process in Standalone & Test. Remote
> host
> > in Yarn.
> >
> > To get a single ApplicationRunner implementation, like Jake suggested, we
> > need to make leader election and JobRunner implementation pluggable.
> > There's still the question of whether ApplicationRunner#run API should be
> > blocking or non-blocking. It has to be non-blocking in YARN. We want it
> to
> > be blocking in standalone, but seems like the main reason is ease of use
> > when launched from main(). I'd prefer making it consitently non-blocking
> > instead, esp. since in embedded standalone mode (where the processor is
> > running in another container) a blocking API would not be user-friendly
> > either. If not, we can add both run and runBlocking.
> >
> > Coming to RuntimeEnvironment, which is the least clear to me so far:
> > 1. I don't think RuntimeEnvironment should be responsible for providing
> > StreamSpecs for streamIds - they can be obtained with a config/util
> class.
> > The StreamProcessor should only know about logical streamIds and the
> > streamId <-> actual stream mapping should happen