[DISCUSS] Mesos Dispatcher (FF2016)

2016-09-26 Thread Wright, Eron

 Hello,

The code I presented at FF2016 represents a 'status-quo' approach to realizing 
a specific scenario - "mesos-session.sh".   But the final solution will involve 
CLI changes and the full realization of a dispatcher, which conflicts with 
FLIP-6.   We should advance the client/dispatcher design of FLIP-6 before I 
make further changes.Also we must decide whether the Mesos work should take 
FLIP-6 as a dependency, and/or strive to land a useable subset into master.

Here's a summary of the code I presented, as reference for ongoing FLIP-6 
design discussion.  A fresh PR for convenience:
https://github.com/EronWright/flink/pull/1/files

1. MesosDispatcher (Backend).   The 'backend' acts as a Mesos framework, to 
launch an AppMaster as a Mesos task.   For each session, the backend accepts 
"session parameters" which define the libraries, configuration, resource 
profile and other parameters needed to launch the AppMaster.The backend 
persists the information for recovery purposes.Leader election is also 
used.   Not included is a dispatcher 'frontend' - a formalized API surface or 
REST server.

2. SessionParameters.   Captures the information needed to launch a session 
based on an AppMaster.A session is a stateful execution environment for a 
program.   The session parameters can also be understood as the historical 
inputs to yarn-session.sh, plus job inputs per FLIP-6.   I acknowledge that the 
term 'session' is a working term.

3 . MesosDispatcherRunner.  The runner for the 'remote dispatcher' scenario, 
which would be started by Marathon, expose a REST API with which to 
submit/manage jobs, and host the above dispatcher backend.

4. FlinkMesosSessionCli.   This class mirrors the FlinkYarnSessionCli, which is 
used in both the 'flink run' scenario and 'yarn-session' scenario.I didn't 
fully implement the CustomCommandLine interface which yields a ClusterClient, 
because the dispatcher API must be fleshed out first.

5. SessionArtifactHelper.   An attempt to consolidate logic related to session 
artifacts (i.e. ship files).

6. DispatcherClient.   The client interface for the dispatcher.  In concept 
there could be numerous implementations - a 
'remote' impl which would make REST calls to a remote dispatcher, a 'local' 
impl which would host the dispatcher directly.  Seen in this PR is only the 
latter but it might be throw-away code.

7. LaunchableMesosSession.   This class generates the effective container 
environment at launch time.

8. ContaineredJobMasterParameters.  Refactored from YARN code for sharing 
purposes.

-Eron

Re: Some thoughts about the lower-level Flink APIs

2016-08-16 Thread Wright, Eron
Jamie,
I think you raise a valid concern but I would hesitate to accept the suggestion 
that the low-level API be promoted to app developers.

Higher-level abstractions tend to be more constrained and more optimized, 
whereas lower-level abstractions tend to be more powerful, be more laborious to 
use and provide the system with less knowledge.   It is a classic tradeoff.

I think it important to consider, what are the important/distinguishing 
characteristics of the Flink framework.Exactly-once guarantees, event-time 
support, support for job upgrade without data loss, fault tolerance, etc.
I’m speculating that the high-level abstraction provided to app developers is 
probably needed to retain those charactistics.  

I think Vasia makes a good point that SQL might be a good alternative way to 
ease into Flink.

Finally, I believe the low-level API is primarily intended for extension 
purposes (connectors, operations, etc) not app development.It could use 
better documentation to ensure that third-party extensions support those key 
characteristics.

-Eron

> On Aug 16, 2016, at 3:12 AM, Vasiliki Kalavri  
> wrote:
> 
> Hi Jamie,
> 
> thanks for sharing your thoughts on this! You're raising some interesting
> points.
> 
> Whether users find the lower-level primitives more intuitive depends on
> their background I believe. From what I've seen, if users are coming from
> the S4/Storm world and are used to the "compositional" way of streaming,
> then indeed it's easier for them to think and operate on that level. These
> are usually people who have seen/built streaming things before trying out
> Flink.
> But if we're talking about analysts and people coming from the "batch" way
> of thinking or people used to working with SQL/python, then the
> higher-level declarative API is probably easier to understand.
> 
> I do think that we should make the lower-level API more visible and
> document it properly, but I'm not sure if we should teach Flink on this
> level first. I think that presenting it as a set of "advanced" features
> makes more sense actually.
> 
> Cheers,
> -Vasia.
> 
> On 16 August 2016 at 04:24, Jamie Grier  wrote:
> 
>> You lost me at lattice, Aljoscha ;)
>> 
>> I do think something like the more powerful N-way FlatMap w/ Timers
>> Aljoscha is describing here would probably solve most of the problem.
>> Often Flink's higher level primitives work well for people and that's
>> great.  It's just that I also spend a fair amount of time discussing with
>> people how to map what they know they want to do onto operations that
>> aren't a perfect fit and it sometimes liberates them when they realize they
>> can just implement it the way they want by dropping down a level.  They
>> usually don't go there themselves, though.
>> 
>> I mention teaching this "first" and then the higher layers I guess because
>> that's just a matter of teaching philosophy.  I think it's good to to see
>> the basic operations that are available first and then understand that the
>> other abstractions are built on top of that.  That way you're not afraid to
>> drop-down to basics when you know what you want to get done.
>> 
>> -Jamie
>> 
>> 
>> On Mon, Aug 15, 2016 at 2:11 AM, Aljoscha Krettek 
>> wrote:
>> 
>>> Hi All,
>>> I also thought about this recently. A good think would be to add a good
>>> user facing operator that behaves more or less like an enhanced FlatMap
>>> with multiple inputs, multiple outputs, state access and keyed timers.
>> I'm
>>> a bit hesitant, though, since users rarely think about the implications
>>> that come with state updating and out-of-order events. If you don't
>>> implement a stateful operator correctly you have pretty much arbitrary
>>> results.
>>> 
>>> The problem with out-of-order event arrival and state update is that the
>>> state basically has to monotonically transition "upwards" through a
>> lattice
>>> for the computation to make sense. I know this sounds rather theoretical
>> so
>>> I'll try to explain with an example. Say you have an operator that waits
>>> for timestamped elements A, B, C to arrive in timestamp order and then
>> does
>>> some processing. The naive approach would be to have a small state
>> machine
>>> that tracks what element you have seen so far. The state machine has
>> three
>>> states: NOTHING, SEEN_A, SEEN_B and SEEN_C. The state machine is supposed
>>> to traverse these states linearly as the elements arrive. This doesn't
>>> work, however, when elements arrive in an order that does not match their
>>> timestamp order. What the user should do is to have a "Set" state that
>>> keeps track of the elements that it has seen. Once it has seen {A, B, C}
>>> the operator must check the timestamps and then do the processing, if
>>> required. The set of possible combinations of A, B, and C forms a lattice
>>> when combined with the "subset" operation. And traversal through these

Re: [DISCUSS] updating apache-flink homebrew formula

2016-08-09 Thread Wright, Eron
Will update the homebrew package to Flink 1.1.1 + Hadoop 2.7 + Scala 2.11.

> On Aug 9, 2016, at 5:48 AM, Stephan Ewen <se...@apache.org> wrote:
> 
> +1 for Hadoop 2.7 and Scala 2.11
> 
> Not sure if you saw some of the threads according to dependency issues in
> the 1.1.0 release. It may be worth to reference the soon-to-come 1.1.1
> release instead.
> 
> On Tue, Aug 9, 2016 at 2:36 PM, Till Rohrmann <trohrm...@apache.org> wrote:
> 
>> +1 for Hadoop 2.7 and Scala 2.11
>> 
>> On Tue, Aug 9, 2016 at 12:25 PM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>> 
>>> Hi,
>>> 
>>> Thanks a lot for maintaining the flink homebrew formula.
>>> I think its a good idea to update to Hadoop 2.7 and Scala 2.11.
>>> 
>>> On Tue, Aug 9, 2016 at 12:59 AM, Wright, Eron <ewri...@live.com> wrote:
>>> 
>>>> Hello,
>>>> 
>>>> With the release of 1.1, I’m happy to update the apache-flink homebrew
>>>> package accordingly.   Quick question, any objection to updating the
>>>> package to use Hadoop 2.7 and Scala 2.11?   I mean, the homebrew
>> package
>>> is
>>>> hardcoded to use Hadoop 2.6 and Scala 2.10 at the moment.
>>>> 
>>>> Note there’s an open issue to integrate the update process into the
>>>> overall Flink release process (FLINK-3903).
>>>> 
>>>> Eron
>>> 
>> 



[DISCUSS] updating apache-flink homebrew formula

2016-08-08 Thread Wright, Eron
Hello,

With the release of 1.1, I’m happy to update the apache-flink homebrew package 
accordingly.   Quick question, any objection to updating the package to use 
Hadoop 2.7 and Scala 2.11?   I mean, the homebrew package is hardcoded to use 
Hadoop 2.6 and Scala 2.10 at the moment.  

Note there’s an open issue to integrate the update process into the overall 
Flink release process (FLINK-3903).

Eron

Re: [DISCUSS] FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.

2016-08-05 Thread Wright, Eron

Let me rephrase my comment on the dispatcher.I mean that its API would be 
job-centric, i.e. with operations like `execute(jobspec)` rather than 
operations like `createSession` that the status-quo would suggest.

Since writing those comments I’ve put more time into developing the Mesos 
dispatcher with FLIP-6 in mind.I see that Till is spinning up an effort 
too, so we should all sync up in the near future.

Eron



> On Aug 5, 2016, at 7:30 AM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi Eron!
> 
> Some comments on your comments:
> 
> *Dispatcher*
>  - The dispatcher should NOT be job-centric. The dispatcher should take
> over the "multi job" responsibilities here, now that the JobManager is
> single-job only.
>  - An abstract dispatcher would be great. It could implement the
> connection/HTTP elements and have an abstract method to start a job
>-> Yarn - use YarnClusterClient to start a YarnJob
>-> Mesos - same thing
>-> Standalone - spawn a JobManager
> 
> *Client*
> This is an interesting point. Max is currently refactoring the clients into
>   - Cluster Client (with specialization for Yarn, Standalone) to launch
> jobs and control a cluster (yarn session, ...)
>   - Job Client, which is connected to a single job and can issue commands
> to that job (cancel/stop/checkpoint/savepoint/change-parallelism)
> 
> Let's try and get his input on this.
> 
> 
> *RM*
> Agreed - the base RM is "stateless", specialized RMs can behave different,
> if they need to.
> RM fencing must be generic - all cluster types can suffer from orphaned
> tasks (Yarn as well, I think)
> 
> 
> *User Code*
> I think in the cases where processes/containers are launched per-job, this
> should always be feasible. It is a nice optimization that I think we should
> do where ever possible. Makes users' life with respect to classloading much
> easier.
> Some cases with custom class loading are currently tough in Flink - that
> way, these jobs would at least run in the yarn/mesos individual job mode
> (not the session mode still, that one needs dynamic class loading).
> 
> *Standalone Security*
> That is a known limitation and fine for now, I think. Whoever wants proper
> security needs to go to Yarn/Mesos initially. Standalone v2.0 may change
> that.
> 
> Greetings,
> Stephan
> 
> 
> 
> On Sat, Jul 30, 2016 at 12:26 AM, Wright, Eron <ewri...@live.com> wrote:
> 
>> The design looks great - it solves for very diverse deployment modes,
>> allows for heterogeneous TMs, and promotes job isolation.
>> 
>> Some feedback:
>> 
>> *Dispatcher*
>> The dispatcher concept here expands nicely on what was introduced in the
>> Mesos design doc (MESOS-1984).  The most significant difference being the
>> job-centric orientation of the dispatcher API.   FLIP-6 seems to eliminate
>> the concept of a session (or, defines it simply as the lifecycle of a JM);
>> is that correct?Do you agree I should revise the Mesos dispatcher
>> design to be job-centric?
>> 
>> I'll be taking the first crack at implementing the dispatcher (for Mesos
>> only) in MESOS-1984 (T2).   I’ll keep FLIP-6 in mind as I go.
>> 
>> The dispatcher's backend behavior will vary significantly for Mesos vs
>> standalone vs others.   Assumedly a base class with concrete
>> implementations will be introduced.  To echo the FLIP-6 design as I
>> understand it:
>> 
>> 1) Standalone
>>   a) The dispatcher process starts an RM, dispatcher frontend, and
>> "local" dispatcher backend at startup.
>>   b) Upon job submission, the local dispatcher backend creates an
>> in-process JM actor for the job.
>>   c) The JM allocates slots as normal.   The RM draws from its pool of
>> registered TM, which grows and shrinks due (only) to external events.
>> 
>> 2) Mesos
>>   a) The dispatcher process starts a dispatcher frontend and "Mesos"
>> dispatcher backend at startup.
>>   b) Upon job submission, the Mesos dispatcher backend creates a Mesos
>> task (dubbed an "AppMaster") which contains a JM/RM for the job.
>>   c) The system otherwise functions as described in the Mesos design doc.
>> 
>> *Client*
>> I'm concerned about the two code paths that the client uses to launch a
>> job (with-dispatcher vs without-dispatcher).   Maybe it could be unified by
>> saying that the client always calls the dispatcher, and that the dispatcher
>> is hostable in either the client or in a separate process.  The only
>> variance would be the client-to-dispatcher transport (local vs HTTP).
>> 
>> *RM*
>>

Re: [DISCUSS] FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.

2016-07-29 Thread Wright, Eron
The design looks great - it solves for very diverse deployment modes, allows 
for heterogeneous TMs, and promotes job isolation.

Some feedback:

*Dispatcher*
The dispatcher concept here expands nicely on what was introduced in the Mesos 
design doc (MESOS-1984).  The most significant difference being the job-centric 
orientation of the dispatcher API.   FLIP-6 seems to eliminate the concept of a 
session (or, defines it simply as the lifecycle of a JM); is that correct?
Do you agree I should revise the Mesos dispatcher design to be job-centric? 

I'll be taking the first crack at implementing the dispatcher (for Mesos only) 
in MESOS-1984 (T2).   I’ll keep FLIP-6 in mind as I go.

The dispatcher's backend behavior will vary significantly for Mesos vs 
standalone vs others.   Assumedly a base class with concrete implementations 
will be introduced.  To echo the FLIP-6 design as I understand it:

1) Standalone
   a) The dispatcher process starts an RM, dispatcher frontend, and "local" 
dispatcher backend at startup.
   b) Upon job submission, the local dispatcher backend creates an in-process 
JM actor for the job.
   c) The JM allocates slots as normal.   The RM draws from its pool of 
registered TM, which grows and shrinks due (only) to external events.

2) Mesos
   a) The dispatcher process starts a dispatcher frontend and "Mesos" 
dispatcher backend at startup.
   b) Upon job submission, the Mesos dispatcher backend creates a Mesos task 
(dubbed an "AppMaster") which contains a JM/RM for the job.   
   c) The system otherwise functions as described in the Mesos design doc.

*Client*
I'm concerned about the two code paths that the client uses to launch a job 
(with-dispatcher vs without-dispatcher).   Maybe it could be unified by saying 
that the client always calls the dispatcher, and that the dispatcher is 
hostable in either the client or in a separate process.  The only variance 
would be the client-to-dispatcher transport (local vs HTTP).

*RM*
On the issue of RM statefulness, we can say that the RM does not persist slot 
allocation (the ground truth is in the TM), but may persist other information 
(related to cluster manager interaction).  For example, the Mesos RM persists 
the assigned framework identifier and per-task planning information (as is 
highly recommended by the Mesos development guide).

On RM fencing, I was already wondering whether to add it to the Mesos RM, so it 
is nice to see it being introduced more generally.   My rationale is, the 
dispatcher cannot guarantee that only a single RM is running, because orphaned 
tasks are possible in certain Mesos failure situations.   Similarly, I’m unsure 
whether YARN provides a strong guarantee about the AM.

*User Code*
Having job code on the system classpath seems possible in only a subset of 
cases.   The variability may be complex.   How important is this optimization?

*Security Implications*
It should be noted that the standalone embodiment doesn't offer isolation 
between jobs.  The whole system will have a single security context (as it does 
now).   

Meanwhile, the ‘high-trust’ nature of the dispatcher in other scenarios is 
rightly emphasized.  The fact that user code shouldn't be run in the dispatcher 
process (except in standalone) must be kept in mind.   The design doc of 
FLINK-3929 (section C2) has more detail on that.


-Eron


> On Jul 28, 2016, at 2:22 AM, Maximilian Michels  wrote:
> 
> Hi Stephan,
> 
> Thanks for the nice wrap-up of ideas and discussions we had over the
> last months (not all on the mailing list though because we were just
> getting started with the FLIP process). The document is very
> comprehensive and explains the changes in great details, even up to
> the message passing level.
> 
> What I really like about the FLIP is that we delegate multi-tenancy
> away from the JobManager to the resource management framework and the
> dispatchers. This will help to make the JobManager component cleaner
> and simpler. The prospect of having the user jars directly in the
> system classpath of the workers, instead of dealing with custom class
> loaders, is very nice.
> 
> The model we have for acquiring and releasing resources wouldn't work
> particularly well with all the new deployment options, so +1 on a new
> task slot request/offer system and +1 for making the ResourceManager
> responsible for TaskManager registration and slot management. This is
> well aligned with the initial idea of the ResourceManager component.
> 
> We definitely need good testing for these changes since the
> possibility of bugs increases with the additional number of messages
> introduced.
> 
> The only thing that bugs me is whether we make the Standalone mode a
> bit less nice to use. The initial bootstrapping of the nodes via the
> local dispatchers and the subsequent registration of TaskManagers and
> allocation of slots could cause some delay. It's not a major concern
> though because it will take little time 

Re: [QUESTION] thread model in Flink makes me confused

2016-05-11 Thread Wright, Eron
One option is to use a separate cluster (JobManager + TaskManagers) for each 
job.   This is fairly straightforward with the YARN support - "flink run” can 
launch a cluster for a job and tear it down afterwards.

Of course this means you must deploy YARN.   That doesn’t necessarily imply 
HDFS though a Hadoop-compatible filesystem (HCFS) is needed to support the YARN 
staging directory. 

This approach also facilitates richer scheduling and multi-user scenarios.   

One downside is the loss of a unified web UI to view all jobs.


> On May 11, 2016, at 8:32 AM, Jark Wu  wrote:
> 
> 
> As I know, Flink uses thread model, that means one TaskManager process may 
> run many different operator threads from different jobs. So tasks from 
> different jobs will compete for memory and CPU in the one process. In the 
> worst case scenario, the bad job will eat most of CPU and memroy which may 
> lead to OOM, and then the regular job died too. And there's another problem, 
> tasks from different jobs will print there logs into the same file(the 
> taskmanager log file). This increases the difficulty of debugging.
> 
> As I know, Storm will spawn workers for every job. The tasks in one worker 
> belong to the same job. So I'm confused the purpose or advantages of Flink 
> design. One more question, is there any tips to solves the issues above? Or 
> any suggestions to implemention the similar desgin with Storm ? 
> 
> Thank you for any answers in advance!
> 
> Regards,
> Jark Wu
> 
> 
> 



Installing Flink with Homebrew

2016-05-08 Thread Wright, Eron
Hi,

I submitted a PR to the homebrew  project (Mac’s unofficial 
package manager) to add a formula for installing Flink 1.0.2. 
https://github.com/Homebrew/homebrew-core/pull/968 


This will simplify installation on Mac.  Here’s the typical flow:

$ brew info apache-flink
apache-flink: stable 1.0.2, HEAD
Scalable batch and stream data processing
https://flink.apache.org/
Not installed
From: 
https://github.com/Homebrew/homebrew-core/blob/master/Formula/apache-flink.rb

$ brew install apache-flink
==> Downloading 
https://www.apache.org/dyn/closer.lua?path=flink/flink-1.0.2/flink-1.0.2-bin-hadoop26-scala_2.10.tgz
Already downloaded: /Library/Caches/Homebrew/apache-flink-1.0.2.tgz
  /usr/local/Cellar/apache-flink/1.0.2: 88 files, 89.9M, built in 4 seconds

$ flink --version
Version: 1.0.2, Commit ID: d39af15

I made the somewhat arbitrary choice to use Scala 2.10 and Hadoop 2.6 for 
consistency with the apache-spark formula.   In future the formula may be 
parameterized to allow the user to decide.

I chose to add only the `flink`, `pyflink2.sh`, and `pyflink3.sh` commands to 
the system path.  I felt that the names of the other bin scripts are too vague 
(`start-local.sh`, etc) to be on the system path.I considered 
`flink-daemon` but was unsure it is a documented tool.

There’s still a chance to change the PR before it is merged.   Of course we can 
change it further with 1.0.3.

Enjoy,
Eron Wright