Thanks for taking on this project. I'm excited about it.
Can you go ahead and make a WIP PR so we can see what the diff looks
like and start giving feedback?
I'll be reviewing the WIP PR carefully.
On Fri, Nov 4, 2016 at 8:43 AM Cyrille Chépélov
<[email protected] <mailto:[email protected]>> wrote:
Hi there,
Some progress on the "separation of fabrics" project:
TL;DR: I have a branch here
https://github.com/cchepelov/scalding/tree/split-fabrics that is
mostly working on *Hadoop*, *Hadoop2-MR1* *and Tez*, … and baby
steps on *Flink*.
*The Good
*
* scalding-core has dependencies on Hadoop for HDFS, but no
longer has explicit dependencies on MAPREDUCE
* One can switch between MAPREDUCE using the legacy hadoop1 API
or MAPREDUCE using Cascading's hadoop2-mr1 fabric
* Most tests run on all four available fabrics in addition to
Local. That is: Legacy Hadoop, Hadoop2-MR1, Tez, _and Flink_.
* Switching from a fabric to another is a matter of supplying
the appropriate fabric jar (scalding-fabric-hadoop,
scalding-fabric-tez, etc.) in your assembly
* Even the REPL seems to accept using a different fabric (!)
* Having an explicit per-fabric bit of code within Scalding
enables experimentation with more advanced things, such
implementing scalding-specific Cascading Planner graph
transforms, as Chris advises.
* I *think* I didn't break any widely-used API at the source level.
*The Bad
*
* I *think* I didn't break any widely-used API at the source
level, but I haven't (yet) checked if any damage control
should/can be done
* A few tests still break in Tez. This is on things that I've
lived with for a long time, but fixing those should be easier
and a higher priority now. For now it seems there are really
two outstanding items left: 1. mapping .withReducers all the
way down to the level of parallelism in the TezChild node in
charge of performing that processing and 2. perhaps a planner
bug, or perhaps a missing scalding-specific planner transform
to handle jobs involving Sketched Joins /(that's on cascading
3.2-wip-6)/
* Flink is not yet ready for prime time. At the moment, I'm
building it using a local snapshot reflecting
https://github.com/dataArtisans/cascading-flink/pull/70 — This
is required as some of Cascading's internal interfaces changed
a bit since 3.1.
Some of the test are bound to fail for now, as cascading-flink
cannot yet map some variants of hash joins (outer right hash
joins, for instance).
* Mode.scala is a mess and will need a little bit of clean-up
* There are still a couple tests that are bound to fail
* Any test that was doing pattern maching on the exact type of
Mode (Test vs. Hadoop vs. Local vs. HadoopTest) *will* fail,
and there is no solution
* Tez and Flink _tests_ seem quite slow. Not yet sure what's
happening, it seems some of the code is simply waiting and
waking up long after a given test job is complete.
*The Ugly*
* Mode.scala is a mess and will *really* need a little bit of
clean-up
* we still need to compile scalding-core with a /provided
/dependency to either cascading-hadoop or
cascading-hadoop2-mr1. This is due to HadoopTap and friends
(HDFS support). Ideally we could have a (perhaps hard?)
dependency on cascading-hadoop2-io since everyone's using it
(hadoop2-mr1, tez, flink), but we'd have to manage the case of
cascading-hadoop (which brings almost identical copies but
cannot, by trade, depend on cascading-hadoop2-io). Still
slightly confused on the best course of action; I'd like
things in scalding-core to actually not compile if they still
accidentally depend on MAPREDUCE. I'm unsure it's achievable
as it is.
* I've tried to share the fabric-sensitive tests from
scalding-core into a pool of tests that is shared and verified
with all fabrics: this is scalding-core-fabric-tests
Although Scalatest's internal discovery seems to be happy with
running anything that looks like a test, the discovery module
used by "sbt test" is different. It only looks at tests that
are implemented within the current project, specifically
ignoring tests inherited from dependencies.
I failed to find a way to convince sbt to adopt scalatest's
discovery pattern. As a result, I've moved the "shared" tests
from scalding-core-fabric-tests into another subdirectory of
src/, which is referenced by all four fabrics as their own. As
a result, this code is compiled 4 times, and IntelliJ can be
confused and refusing to step into that.
If there is an sbt guru around willing to give me a hand on
this, I'd be really grateful.
* Making counter implementation dependent on the fabric required
passing a class /name/ into fabric-specific properties, then
using reflection to instantiate them up.
* The smart tricks needed to make JobTest work and mock out taps
which can be LocalTap or HadoopTaps pretty much at will
* I couldn't really wrap my head around enough of this without
actually digging in, rather than planning/designing first.
Some documentation and possibly a restart from scratch might
be needed after all.
Things I'm inclined to kick to "later": can we also abstract out
storage from "necessarily HDFS"? Is that something useful?
On the other hand, as the (Storage Mode) x (Execution Mode) x
(data Scheme) support matrix can be daunting, it can be useful to
still make the assumption that everything is HDFS unless it's on
LocalTaps which sometimes can be HadoopTap-and-a-wrapper or the
other way around.
Next steps: incorporate feedback, clean up, fix outstanding issues
in scalding-fabric-tez, (fix in flink in due time), keep current
with the develop/cascading3 branches, then figure out how to
mainstream that (probably, indeed, breaking up what can be broken
up into individual PRs, but I'm afraid there will still be a big
atomic change of something at one point).
For now that's just a branch, would it make sense to open an "RFC
only" PR to enable the review tools?
-- Cyrille
Le 14/10/2016 à 22:47, 'Alex Levenson' via Scalding Development a
écrit :
This is a large enough change, that probably won't fit into a
single PR, that it might merit some sort of design doc / written
plan. That way we can come up with a plan and then start
implementing it piece by piece across a few PRs.
On Wed, Oct 12, 2016 at 2:11 PM, 'Oscar Boykin' via Scalding
Development <[email protected]
<mailto:[email protected]>> wrote:
sounds great!
On Tue, Oct 11, 2016 at 11:39 PM Cyrille Chépélov
<[email protected]
<mailto:[email protected]>> wrote:
Oscar, Piyush,
thanks for the feedback!
At the moment, I'm not sure it's realistic to fully break
the dependency to "hadoop" completely out of
scalding-core. As an intermediate goal, I'd shoot for at
least soft-removing the assumption that the /processing/
is made on Hadoop, but the storage interface will pretty
much remain HDFS for the time being (IOW, I'll leave
Source essentially unchanged in scalding-core).
Meanwhile, I'm taking the messages here and on the gitter
channel as positive towards the principle of
scalding-$FABRIC sub-modules, and will start working on
that in the background.
-- Cyrille
Le 12/10/2016 à 03:29, 'Oscar Boykin' via Scalding
Development a écrit :
Generally, I think this is a good idea also (separate
modules for fabrics).
I agree that Mode and Job are a bit hairy in spots. I
think we can remove some deprecated code if it makes
life significantly easier, but source and binary
compatibility should be kept as much as we can
reasonably manage.
I would actually really rather `buildFlow` be
private[scalding] but maybe that is too much. Making it
return a subclass of Flow seems like a fine idea to me
at the moment.
Breaking hadoop out of scalding-core seems pretty hard
since `Source` has it baked in at a few spots. That
said, the Source abstractions in scalding are not very
great. If we could improve that (without removing
support for the old stuff) it might be worth it. Many
have complained about Source's design over the years,
but we have not really had a full proposal that seems to
address all the concerns.
The desire for jobs to all look the same across all
fabrics make modularization a bit ugly.
On Tue, Oct 11, 2016 at 2:23 PM 'Piyush Narang' via
Scalding Development <[email protected]
<mailto:[email protected]>> wrote:
We ran into similar problems while trying to set the
number of reducers while testing out Cascading3 on
Tez. We hacked around it temporarily
<https://github.com/twitter/scalding/commit/57983601c7db4ef1e0df3350140d473f371e6bb3>
but
haven't yet cleaned up that code and put it out for
review (we'll need to fork MR / Tez there as
nodeConfigDef works for Tez but not Hadoop). Based
on my understanding, so far we've tried to delegate
as much of this to Cascading as we can but there
seem to be a few places where we're doing some
platform specific stuff in Scalding. Breaking up to
create fabric-specific sub-modules seems like a nice
idea to me. We might need to think through the right
way to do this to ensure we don't break stuff. Would
it make sense to spin up an issue and we can discuss
on it?
On Tue, Oct 11, 2016 at 10:42 AM, Cyrille Chépélov
<[email protected]
<mailto:[email protected]>> wrote:
Hi,
I'm trying to tie a few loose ends in the way
step descriptions (text typically passed via
*.withDescriptions(...)*) and the desired level
of parallelism (typically passed via
*.withReducers(N)*) is pushed on the various
fabrics.
Right now:
* Most of the scalding code base either
ignores the back-end (good) or assumes
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala#L81>
the world is either Local or HadoopFlow
(which covers Hadoop 1.x and MR1). As a
consequence, a couple things don't yet work
smoothly on Tez and I assume on Flink.
* the descriptions are entirely dropped if not
running on Hadoop1 or MR1
* .withReducers sets a hadoop-specific
property (/mapred/./reduce/./tasks/) at
RichPipe#L41
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala#L41>
* the Tez fabric ignores .withReducers; and
there is no other conduit (for now) to set
the number of desired parts on the sinks. As
a consequence, you can't run a tez DAG with
a large level of parallelism and a small
(single) number of output files (e.g. stats
leading to a result file of a couple dozen
lines); you must pick one and set
*cascading.flow.runtime.gather.partitions.num*.
There are workarounds, but they're quite ugly.
* there are a few instance of "flow match {
case HadoopFlow => doSomething ; case _ =>
() }" scattered around the code
* there's some heavily reflection-based code
in Mode.scala
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala#L75>
which depends on jars not part of the
scalding build process (and it's good that
these jars stay out of the scalding-core
build, e.g. Tez client libraries)
* While it may be desirable to experiment with
scalding-specific transform registries for
cascading (e.g. to deal with the
Merge-GroupBy structure, or to perform
tests/assertions on the resulting flow
graph), it would be impractical to perform
the necessary fabric-specific adjustments in
Mode.scala as it is.
I'm trying to find a way to extract away the
MR-isms, and push it into fabric-specific code
which can be called when appropriate.
Questions:
1. Would it be appropriate to start having
fabric-specific jars
(scalding-fabric-hadoop,
scalding-fabric-hadoop2-mr1,
scalding-fabric-tez etc.), push the
fabric-specific code from Mode.scala there ?
(we'd keep only the single scalding
fabric-related factory using reflection,
with appropriate interfaces defined in
scalding-core)
2. Pushing the fabric-specific code into
dedicated jars would probably have
user-visible consequences, as we can't make
scalding-core depend on
scalding-fabric-hadoop (for
back-compatibility) unless the
fabric-factory interface go into another jar.
From my point of view, I would find that
intentionally slightly breaking the build
once upon upgrade for the purpose of letting
the world know that there are other fabrics
than MR1 might be acceptable, and on the
other hand I haven't used MR1 for over a year.
Is this "slight" dependency breakage
acceptable, or is it better to have
scalding-core still imply the hadoop fabrics?
3. Right now, scalding's internals sometimes
use Hadoop (MR) specifics to carry various
configuration values. Is it acceptable to
(at least in the beginning) continue doing
so, kindling asking the respective
non-hadoop fabrics to pick these values up
and convert to the relevant APIs?
4. Is it okay to drop the @deprecated(...,
"0.12.0") functions from Mode.scala if they
are inconvenient to carry over in the process?
5. Currently, Job.buildFlow
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/Job.scala#L223>
returns Flow[_]. Is it okay to have it
return Flow[_] with ScaldingFlowGoodies
instead, ScaldingFlowGoodies being the
provisional interface name where to move the
old "flow match { case HadoopFlow => ... }"
code?
Thanks in advance
-- Cyrille
--
You received this message because you are
subscribed to the Google Groups "Scalding
Development" group.
To unsubscribe from this group and stop
receiving emails from it, send an email to
[email protected]
<mailto:[email protected]>.
For more options, visit
https://groups.google.com/d/optout.
--
- Piyush
--
You received this message because you are subscribed
to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving
emails from it, send an email to
[email protected]
<mailto:[email protected]>.
For more options, visit
https://groups.google.com/d/optout.
--
You received this message because you are subscribed to
the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails
from it, send an email to
[email protected]
<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to
the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails
from it, send an email to
[email protected]
<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the
Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from
it, send an email to
[email protected]
<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout.
--
Alex Levenson
@THISWILLWORK
--
You received this message because you are subscribed to the
Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it,
send an email to [email protected]
<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google
Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it,
send an email to [email protected]
<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout.