Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-26 Thread xinyu liu
Let me give a shot to summarize the requirements for ApplicationRunner we
have discussed so far:

- Support environment for passing in user-defined objects (streams
potentially) into ApplicationRunner (*Beam*)

- Improve ease of use for ApplicationRunner to avoid complex configurations
such as zkCoordinator, zkCoordinationService. (*Standalone*)

- Clean up ApplicationRunner into a single interface (*Fluent*). We can
have one or more implementations but it's hidden from the users.

- Separate StreamGraph from environment so it can be serializable (*Beam,
Yarn*)

- Better life cycle management of application, including
start/stop/stats (*Standalone,
Beam*)


One way to address 2 and 3 is to provide pre-packaged runner using static
factory methods, and the return type will be the ApplicationRunner
interface. So we can have:

  ApplicationRunner runner = ApplicationRunner.zk() / ApplicationRunner.local()
/ ApplicationRunner.remote() / ApplicationRunner.test().

Internally we will package the right configs and run-time environment with
the runner. For example, ApplicationRunner.zk() will define all the configs
needed for zk coordination.

To support 1 and 4, can we pass in a lambda function in the runner, and
then we can run the stream graph? Like the following:

  ApplicationRunner.zk().env(config -> environment).run(streamGraph);

Then we need a way to pass the environment into the StreamGraph. This can
be done by either adding an extra parameter to each operator, or have a
getEnv() function in the MessageStream, which seems to be pretty hacky.

What do you think?

Thanks,
Xinyu





On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
pmaheshw...@linkedin.com.invalid> wrote:

> Thanks for putting this together Yi!
>
> I agree with Jake, it does seem like there are a few too many moving parts
> here. That said, the problem being solved is pretty broad, so let me try to
> summarize my current understanding of the requirements. Please correct me
> if I'm wrong or missing something.
>
> ApplicationRunner and JobRunner first, ignoring test environment for the
> moment.
> ApplicationRunner:
> 1. Create execution plan: Same in Standalone and Yarn
> 2. Create intermediate streams: Same logic but different leader election
> (ZK-based or pre-configured in standalone, AM in Yarn).
> 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
>
> JobRunner:
> 1. Run the StreamProcessors: Same process in Standalone & Test. Remote host
> in Yarn.
>
> To get a single ApplicationRunner implementation, like Jake suggested, we
> need to make leader election and JobRunner implementation pluggable.
> There's still the question of whether ApplicationRunner#run API should be
> blocking or non-blocking. It has to be non-blocking in YARN. We want it to
> be blocking in standalone, but seems like the main reason is ease of use
> when launched from main(). I'd prefer making it consitently non-blocking
> instead, esp. since in embedded standalone mode (where the processor is
> running in another container) a blocking API would not be user-friendly
> either. If not, we can add both run and runBlocking.
>
> Coming to RuntimeEnvironment, which is the least clear to me so far:
> 1. I don't think RuntimeEnvironment should be responsible for providing
> StreamSpecs for streamIds - they can be obtained with a config/util class.
> The StreamProcessor should only know about logical streamIds and the
> streamId <-> actual stream mapping should happen within the
> SystemProducer/Consumer/Admins provided by the RuntimeEnvironment.
> 2. There's also other components that the user might be interested in
> providing implementations of in embedded Standalone mode (i.e., not just in
> tests) - MetricsRegistry and JMXServer come to mind.
> 3. Most importantly, it's not clear to me who creates and manages the
> RuntimeEnvironment. It seems like it should be the ApplicationRunner or the
> user because of (2) above and because StreamManager also needs access to
> SystemAdmins for creating intermediate streams which users might want to
> mock. But it also needs to be passed down to the StreamProcessor - how
> would this work on Yarn?
>
> I think we should figure out how to integrate RuntimeEnvironment with
> ApplicationRunner before we can make a call on one vs. multiple
> ApplicationRunner implementations. If we do keep LocalApplicationRunner and
> RemoteApplication (and TestApplicationRunner) separate, agree with Jake
> that we should remove the JobRunners and roll them up into the respective
> ApplicationRunners.
>
> - Prateek
>
> On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes  wrote:
>
> > Thanks for the SEP!
> >
> > +1 on introducing these new components
> > -1 on the current definition of their roles (see Design feedback below)
> >
> > *Design*
> >
> >- If LocalJobRunner and RemoteJobRunner handle the different methods
> of
> >launching a Job, what additional value do the different types of
> >ApplicationRunner and RuntimeEnvironme

[GitHub] samza pull request #143: SAMZA-1200: Scala compile for samza-core fails with...

2017-04-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/143


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #144: SAMZA-1026: HDFS System Producer should not have Ka...

2017-04-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/144


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #144: SAMZA-1026: HDFS System Producer should not have Ka...

2017-04-26 Thread prateekm
GitHub user prateekm opened a pull request:

https://github.com/apache/samza/pull/144

SAMZA-1026: HDFS System Producer should not have Kafka dependency



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/prateekm/samza hdfs-kafka-dependency

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/144.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #144


commit f247a6596cf275f2b80b06ccf61c2fd98596a33c
Author: Prateek Maheshwari 
Date:   2017-04-26T23:16:06Z

SAMZA-1026: HDFS System Producer should not have Kafka dependency




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #143: SAMZA-1200: Scala compile for samza-core fails with...

2017-04-26 Thread prateekm
GitHub user prateekm opened a pull request:

https://github.com/apache/samza/pull/143

SAMZA-1200: Scala compile for samza-core fails with ambiguous reference 
error...

... for some compiler versions.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/prateekm/samza logging-compile-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/143.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #143


commit 38c3183bdde82bded70a6186c1242a4fadaf1f87
Author: Prateek Maheshwari 
Date:   2017-04-26T22:58:32Z

SAMZA-1200: Scala compile for samza-core occassionally fails with ambiguous 
reference error




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #142: SAMZA-1219 Add metrics for operator message receive...

2017-04-26 Thread prateekm
GitHub user prateekm opened a pull request:

https://github.com/apache/samza/pull/142

SAMZA-1219 Add metrics for operator message received and execution times



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/prateekm/samza operator-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/142.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #142


commit cfb710d688701f7fbbf61d6e41c29397e9e3fabe
Author: Prateek Maheshwari 
Date:   2017-04-26T22:35:38Z

SAMZA-1219 Add metrics for operator message received and execution times




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] SEP-3: Heart-beat mechanism between JobCoordinator and all running containers

2017-04-26 Thread Jagadish Venkatraman
Hi Abhishek,

Heartbeat between the AM and container has been a long awaited Samza
feature. It will go a long way in ensuring our reliability! +1 for this SEP.

*High level comments:*

Currently, the only use-case for the heartbeat mechanism seems to be when
running Samza on Yarn. IMHO, it makes sense to pull the heart beat logic
into the *LocalContainerRunner* instead of baking it into the
*SamzaContainer* class. Long term, we can re-visit this when we have a
pluggable liveness detection mechanism.

I'm thinking of a flow like this:

There is a separate component (or a thread) inside LocalContainerRunner
that periodically polls the coordinator, and determines if it should
continue running. If the coordinator determines that the container should
not run, the *LocalContainerRunner* cleanly shuts-down the container and
the process exits with a non-zero exit status.

The following nice properties fall out:

   - We can remove the proposed config *job.container.validator.enabled. *
   - We can also remove the proposed *Killable* interface since
   *SamzaContainer* (and runLoops) don't have to implement *Killable *
   anymore. The life-cycle is managed by the *LocalContainerRunner* that
   started it.

*On the proposed public interfaces:*

*job.container.validator.enabled:  *I am not in favor of adding this as a
new public config. IIUC, When running Samza jobs on Yarn, we always want
the validator/heartbeats to be enabled. OTOH, when running Samza jobs in
standalone mode, we currently do not have a pluggable mechanism for
heartbeat.

*job.container.schedule.ms : *It does
seem that we can pick a sensible default, and be done with it (instead of
adding a new config)? Is there a reason this needs to be configurable?

*On proposed Killable interface: *

Not entirely sure we need this new "*Killable"* interface (esp. given that
there's currently only one implementation - *SamzaContainer*).

   - The *LocalContainerRunner* can instead directly invoke shut-down on
   the *SamzaContainer* when its heart-beat expires. The extra level of
   indirection (making *SamzaContainer* to implement *Killable*) is
   probably unnecessary IMHO.


   - Since, the *LocalContainerRunner* invokes *start/run* on the
   *SamzaContainer*, it seems simpler also have it invoke *shutdown* on the
   *SamzaContainer. *

*Minor Comments:*

>> Expose a REST endpoint (eg: /isContainerValid) who's purpose is to get
requests from the Samza container periodically and respond back weather the
container is in the Job Coordinator's current list of valid containers.

Wondering if it'd be slightly cleaner to rename this to */heartBeatRequest*
and return a *heartBeatResponse* as *CONTINUE, DIE*.  The name
*isContainerValid
* and the definition of validity does seem slightly broad?

Thanks again for taking the time to draft the SEP, and volunteering to
implement this. Nice work!

Best,
Jagadish

On Mon, Apr 24, 2017 at 6:42 PM, Abhishek Shivanna 
wrote:

> Hi Everyone,
>
> In order to fix the issue of orphaned/leaky containers seen when the
> YARN Node Manager crashes, I have created a SEP discussing the design for
> implementing a heartbeat between the containers and the job coordinator:
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> 3%3A+Heart-beat+mechanism+between+JobCoordinator+and+
> all+running+containers
>
> Please take a look and provide feedback. I would also really appreciate
> help in designing a way to propagate the error up from SamzaContainer in
> order to exit the container with a non-zero exit code.
>
> Thanks,
> Abhishek
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University


[GitHub] samza pull request #141: Samza 1214: Allow users to set a default replicatio...

2017-04-26 Thread jmakes
GitHub user jmakes opened a pull request:

https://github.com/apache/samza/pull/141

Samza 1214: Allow users to set a default replication.factor for 
intermediate topics

* Add a new "systems.sysName.default.stream.*" config structure that allows 
users to set system-wide defaults for streams.
* More thorough testing of system defaults and stream defaults
* Removed the old migration config from the config table since there's no 
code to support it.
* Moved 2 kafka-specific config accessors out of JobConfig and into 
KafkaConfig
* Removed duplicate impl of getChangelogStream()

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jmakes/samza samza-1214

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #141


commit d2d924ae2410562830b0d21a9077165f9cf51d12
Author: Jacob Maes 
Date:   2017-04-26T19:32:18Z

SAMZA-1214: Allow users to set a default replication.factor for 
intermediate topics

commit 80cd86f428c0df4be07300f0882679baf752423a
Author: Jacob Maes 
Date:   2017-04-26T19:37:04Z

Merge remote-tracking branch 'upstream/master'




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #138: SAMZA-1233: Create SystemAdmin only for JobCoordina...

2017-04-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/138


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---