Hi there,
Some progress on the "separation of fabrics" project:
TL;DR: I have a branch here
https://github.com/cchepelov/scalding/tree/split-fabrics that is mostly
working on *Hadoop*, *Hadoop2-MR1* *and Tez*, … and baby steps on *Flink*.
*The Good
*
* scalding-core has dependencies on Hadoop for HDFS, but no longer has
explicit dependencies on MAPREDUCE
* One can switch between MAPREDUCE using the legacy hadoop1 API or
MAPREDUCE using Cascading's hadoop2-mr1 fabric
* Most tests run on all four available fabrics in addition to Local.
That is: Legacy Hadoop, Hadoop2-MR1, Tez, _and Flink_.
* Switching from a fabric to another is a matter of supplying the
appropriate fabric jar (scalding-fabric-hadoop, scalding-fabric-tez,
etc.) in your assembly
* Even the REPL seems to accept using a different fabric (!)
* Having an explicit per-fabric bit of code within Scalding enables
experimentation with more advanced things, such implementing
scalding-specific Cascading Planner graph transforms, as Chris advises.
* I *think* I didn't break any widely-used API at the source level.
*The Bad
*
* I *think* I didn't break any widely-used API at the source level,
but I haven't (yet) checked if any damage control should/can be done
* A few tests still break in Tez. This is on things that I've lived
with for a long time, but fixing those should be easier and a higher
priority now. For now it seems there are really two outstanding
items left: 1. mapping .withReducers all the way down to the level
of parallelism in the TezChild node in charge of performing that
processing and 2. perhaps a planner bug, or perhaps a missing
scalding-specific planner transform to handle jobs involving
Sketched Joins /(that's on cascading 3.2-wip-6)/
* Flink is not yet ready for prime time. At the moment, I'm building
it using a local snapshot reflecting
https://github.com/dataArtisans/cascading-flink/pull/70 — This is
required as some of Cascading's internal interfaces changed a bit
since 3.1.
Some of the test are bound to fail for now, as cascading-flink
cannot yet map some variants of hash joins (outer right hash joins,
for instance).
* Mode.scala is a mess and will need a little bit of clean-up
* There are still a couple tests that are bound to fail
* Any test that was doing pattern maching on the exact type of Mode
(Test vs. Hadoop vs. Local vs. HadoopTest) *will* fail, and there is
no solution
* Tez and Flink _tests_ seem quite slow. Not yet sure what's
happening, it seems some of the code is simply waiting and waking up
long after a given test job is complete.
*The Ugly*
* Mode.scala is a mess and will *really* need a little bit of clean-up
* we still need to compile scalding-core with a /provided /dependency
to either cascading-hadoop or cascading-hadoop2-mr1. This is due to
HadoopTap and friends (HDFS support). Ideally we could have a
(perhaps hard?) dependency on cascading-hadoop2-io since everyone's
using it (hadoop2-mr1, tez, flink), but we'd have to manage the case
of cascading-hadoop (which brings almost identical copies but
cannot, by trade, depend on cascading-hadoop2-io). Still slightly
confused on the best course of action; I'd like things in
scalding-core to actually not compile if they still accidentally
depend on MAPREDUCE. I'm unsure it's achievable as it is.
* I've tried to share the fabric-sensitive tests from scalding-core
into a pool of tests that is shared and verified with all fabrics:
this is scalding-core-fabric-tests
Although Scalatest's internal discovery seems to be happy with
running anything that looks like a test, the discovery module used
by "sbt test" is different. It only looks at tests that are
implemented within the current project, specifically ignoring tests
inherited from dependencies.
I failed to find a way to convince sbt to adopt scalatest's
discovery pattern. As a result, I've moved the "shared" tests from
scalding-core-fabric-tests into another subdirectory of src/, which
is referenced by all four fabrics as their own. As a result, this
code is compiled 4 times, and IntelliJ can be confused and refusing
to step into that.
If there is an sbt guru around willing to give me a hand on this,
I'd be really grateful.
* Making counter implementation dependent on the fabric required
passing a class /name/ into fabric-specific properties, then using
reflection to instantiate them up.
* The smart tricks needed to make JobTest work and mock out taps which
can be LocalTap or HadoopTaps pretty much at will
* I couldn't really wrap my head around enough of this without
actually digging in, rather than planning/designing first. Some
documentation and possibly a restart from scratch might be needed
after all.
Things I'm inclined to kick to "later": can we also abstract out storage
from "necessarily HDFS"? Is that something useful?
On the other hand, as the (Storage Mode) x (Execution Mode) x (data
Scheme) support matrix can be daunting, it can be useful to still make
the assumption that everything is HDFS unless it's on LocalTaps which
sometimes can be HadoopTap-and-a-wrapper or the other way around.
Next steps: incorporate feedback, clean up, fix outstanding issues in
scalding-fabric-tez, (fix in flink in due time), keep current with the
develop/cascading3 branches, then figure out how to mainstream that
(probably, indeed, breaking up what can be broken up into individual
PRs, but I'm afraid there will still be a big atomic change of something
at one point).
For now that's just a branch, would it make sense to open an "RFC only"
PR to enable the review tools?
-- Cyrille
Le 14/10/2016 à 22:47, 'Alex Levenson' via Scalding Development a écrit :
This is a large enough change, that probably won't fit into a single
PR, that it might merit some sort of design doc / written plan. That
way we can come up with a plan and then start implementing it piece by
piece across a few PRs.
On Wed, Oct 12, 2016 at 2:11 PM, 'Oscar Boykin' via Scalding
Development <[email protected]
<mailto:[email protected]>> wrote:
sounds great!
On Tue, Oct 11, 2016 at 11:39 PM Cyrille Chépélov
<[email protected] <mailto:[email protected]>>
wrote:
Oscar, Piyush,
thanks for the feedback!
At the moment, I'm not sure it's realistic to fully break the
dependency to "hadoop" completely out of scalding-core. As an
intermediate goal, I'd shoot for at least soft-removing the
assumption that the /processing/ is made on Hadoop, but the
storage interface will pretty much remain HDFS for the time
being (IOW, I'll leave Source essentially unchanged in
scalding-core).
Meanwhile, I'm taking the messages here and on the gitter
channel as positive towards the principle of scalding-$FABRIC
sub-modules, and will start working on that in the background.
-- Cyrille
Le 12/10/2016 à 03:29, 'Oscar Boykin' via Scalding Development
a écrit :
Generally, I think this is a good idea also (separate modules
for fabrics).
I agree that Mode and Job are a bit hairy in spots. I think
we can remove some deprecated code if it makes life
significantly easier, but source and binary compatibility
should be kept as much as we can reasonably manage.
I would actually really rather `buildFlow` be
private[scalding] but maybe that is too much. Making it
return a subclass of Flow seems like a fine idea to me at the
moment.
Breaking hadoop out of scalding-core seems pretty hard since
`Source` has it baked in at a few spots. That said, the
Source abstractions in scalding are not very great. If we
could improve that (without removing support for the old
stuff) it might be worth it. Many have complained about
Source's design over the years, but we have not really had a
full proposal that seems to address all the concerns.
The desire for jobs to all look the same across all fabrics
make modularization a bit ugly.
On Tue, Oct 11, 2016 at 2:23 PM 'Piyush Narang' via Scalding
Development <[email protected]
<mailto:[email protected]>> wrote:
We ran into similar problems while trying to set the
number of reducers while testing out Cascading3 on Tez.
We hacked around it temporarily
<https://github.com/twitter/scalding/commit/57983601c7db4ef1e0df3350140d473f371e6bb3>
but
haven't yet cleaned up that code and put it out for
review (we'll need to fork MR / Tez there as
nodeConfigDef works for Tez but not Hadoop). Based on my
understanding, so far we've tried to delegate as much of
this to Cascading as we can but there seem to be a few
places where we're doing some platform specific stuff in
Scalding. Breaking up to create fabric-specific
sub-modules seems like a nice idea to me. We might need
to think through the right way to do this to ensure we
don't break stuff. Would it make sense to spin up an
issue and we can discuss on it?
On Tue, Oct 11, 2016 at 10:42 AM, Cyrille Chépélov
<[email protected]
<mailto:[email protected]>> wrote:
Hi,
I'm trying to tie a few loose ends in the way step
descriptions (text typically passed via
*.withDescriptions(...)*) and the desired level of
parallelism (typically passed via *.withReducers(N)*)
is pushed on the various fabrics.
Right now:
* Most of the scalding code base either ignores the
back-end (good) or assumes
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala#L81>
the world is either Local or HadoopFlow (which
covers Hadoop 1.x and MR1). As a consequence, a
couple things don't yet work smoothly on Tez and
I assume on Flink.
* the descriptions are entirely dropped if not
running on Hadoop1 or MR1
* .withReducers sets a hadoop-specific property
(/mapred/./reduce/./tasks/) at RichPipe#L41
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala#L41>
* the Tez fabric ignores .withReducers; and there
is no other conduit (for now) to set the number
of desired parts on the sinks. As a consequence,
you can't run a tez DAG with a large level of
parallelism and a small (single) number of output
files (e.g. stats leading to a result file of a
couple dozen lines); you must pick one and set
*cascading.flow.runtime.gather.partitions.num*.
There are workarounds, but they're quite ugly.
* there are a few instance of "flow match { case
HadoopFlow => doSomething ; case _ => () }"
scattered around the code
* there's some heavily reflection-based code in
Mode.scala
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala#L75>
which depends on jars not part of the scalding
build process (and it's good that these jars stay
out of the scalding-core build, e.g. Tez client
libraries)
* While it may be desirable to experiment with
scalding-specific transform registries for
cascading (e.g. to deal with the Merge-GroupBy
structure, or to perform tests/assertions on the
resulting flow graph), it would be impractical to
perform the necessary fabric-specific adjustments
in Mode.scala as it is.
I'm trying to find a way to extract away the MR-isms,
and push it into fabric-specific code which can be
called when appropriate.
Questions:
1. Would it be appropriate to start having
fabric-specific jars (scalding-fabric-hadoop,
scalding-fabric-hadoop2-mr1, scalding-fabric-tez
etc.), push the fabric-specific code from
Mode.scala there ?
(we'd keep only the single scalding
fabric-related factory using reflection, with
appropriate interfaces defined in scalding-core)
2. Pushing the fabric-specific code into dedicated
jars would probably have user-visible
consequences, as we can't make scalding-core
depend on scalding-fabric-hadoop (for
back-compatibility) unless the fabric-factory
interface go into another jar.
From my point of view, I would find that
intentionally slightly breaking the build once
upon upgrade for the purpose of letting the world
know that there are other fabrics than MR1 might
be acceptable, and on the other hand I haven't
used MR1 for over a year.
Is this "slight" dependency breakage acceptable,
or is it better to have scalding-core still imply
the hadoop fabrics?
3. Right now, scalding's internals sometimes use
Hadoop (MR) specifics to carry various
configuration values. Is it acceptable to (at
least in the beginning) continue doing so,
kindling asking the respective non-hadoop fabrics
to pick these values up and convert to the
relevant APIs?
4. Is it okay to drop the @deprecated(..., "0.12.0")
functions from Mode.scala if they are
inconvenient to carry over in the process?
5. Currently, Job.buildFlow
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/Job.scala#L223>
returns Flow[_]. Is it okay to have it return
Flow[_] with ScaldingFlowGoodies instead,
ScaldingFlowGoodies being the provisional
interface name where to move the old "flow match
{ case HadoopFlow => ... }" code?
Thanks in advance
-- Cyrille
--
You received this message because you are subscribed
to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving
emails from it, send an email to
[email protected]
<mailto:[email protected]>.
For more options, visit
https://groups.google.com/d/optout
<https://groups.google.com/d/optout>.
--
- Piyush
--
You received this message because you are subscribed to
the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails
from it, send an email to
[email protected]
<mailto:[email protected]>.
For more options, visit
https://groups.google.com/d/optout
<https://groups.google.com/d/optout>.
--
You received this message because you are subscribed to the
Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from
it, send an email to
[email protected]
<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout
<https://groups.google.com/d/optout>.
--
You received this message because you are subscribed to the
Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from
it, send an email to [email protected]
<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout
<https://groups.google.com/d/optout>.
--
You received this message because you are subscribed to the Google
Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it,
send an email to [email protected]
<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout
<https://groups.google.com/d/optout>.
--
Alex Levenson
@THISWILLWORK
--
You received this message because you are subscribed to the Google
Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to [email protected]
<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Scalding
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.