Hi there,

Some progress on the "separation of fabrics" project:

TL;DR: I have a branch here https://github.com/cchepelov/scalding/tree/split-fabrics that is mostly working on *Hadoop*, *Hadoop2-MR1* *and Tez*, … and baby steps on *Flink*.

*The Good
*

 * scalding-core has dependencies on Hadoop for HDFS, but no longer has
   explicit dependencies on MAPREDUCE
 * One can switch between MAPREDUCE using the legacy hadoop1 API or
   MAPREDUCE using Cascading's hadoop2-mr1 fabric
 * Most tests run on all four available fabrics in addition to Local.
   That is: Legacy Hadoop, Hadoop2-MR1, Tez, _and Flink_.
 * Switching from a fabric to another is a matter of supplying the
   appropriate fabric jar (scalding-fabric-hadoop, scalding-fabric-tez,
   etc.) in your assembly
 * Even the REPL seems to accept using a different fabric (!)
 * Having an explicit per-fabric bit of code within Scalding enables
   experimentation with more advanced things, such implementing
   scalding-specific Cascading Planner graph transforms, as Chris advises.
 * I *think* I didn't break any widely-used API at the source level.

*The Bad
*

 * I *think* I didn't break any widely-used API at the source level,
   but I haven't (yet) checked if any damage control should/can be done
 * A few tests still break in Tez. This is on things that I've lived
   with for a long time, but fixing those should be easier and a higher
   priority now. For now it seems there are really two outstanding
   items left: 1. mapping .withReducers all the way down to the level
   of parallelism in the TezChild node in charge of performing that
   processing and 2. perhaps a planner bug, or perhaps a missing
   scalding-specific planner transform to handle jobs involving
   Sketched Joins /(that's on cascading 3.2-wip-6)/
 * Flink is not yet ready for prime time. At the moment, I'm building
   it using a local snapshot reflecting
   https://github.com/dataArtisans/cascading-flink/pull/70 — This is
   required as some of Cascading's internal interfaces changed a bit
   since 3.1.
   Some of the test are bound to fail for now, as cascading-flink
   cannot yet map some variants of hash joins (outer right hash joins,
   for instance).
 * Mode.scala is a mess and will need a little bit of clean-up
 * There are still a couple tests that are bound to fail
 * Any test that was doing pattern maching on the exact type of Mode
   (Test vs. Hadoop vs. Local vs. HadoopTest) *will* fail, and there is
   no solution
 * Tez and Flink _tests_ seem quite slow. Not yet sure what's
   happening, it seems some of the code is simply waiting and waking up
   long after a given test job is complete.

*The Ugly*

 * Mode.scala is a mess and will *really* need a little bit of clean-up

 * we still need to compile scalding-core with a /provided /dependency
   to either cascading-hadoop or cascading-hadoop2-mr1. This is due to
   HadoopTap and friends (HDFS support). Ideally we could have a
   (perhaps hard?) dependency on cascading-hadoop2-io since everyone's
   using it (hadoop2-mr1, tez, flink), but we'd have to manage the case
   of cascading-hadoop (which brings almost identical copies but
   cannot, by trade, depend on cascading-hadoop2-io). Still slightly
   confused on the best course of action; I'd like things in
   scalding-core to actually not compile if they still accidentally
   depend on MAPREDUCE. I'm unsure it's achievable as it is.

 * I've tried to share the fabric-sensitive tests from scalding-core
   into a pool of tests that is shared and verified with all fabrics:
   this is scalding-core-fabric-tests

   Although Scalatest's internal discovery seems to be happy with
   running anything that looks like a test, the discovery module used
   by "sbt test" is different. It only looks at tests that are
   implemented within the current project, specifically ignoring tests
   inherited from dependencies.

   I failed to find a way to convince sbt to adopt scalatest's
   discovery pattern. As a result, I've moved the "shared" tests from
   scalding-core-fabric-tests into another subdirectory of src/, which
   is referenced by all four fabrics as their own. As a result, this
   code is compiled 4 times, and IntelliJ can be confused and refusing
   to step into that.

   If there is an sbt guru around willing to give me a hand on this,
   I'd be really grateful.

 * Making counter implementation dependent on the fabric required
   passing a class /name/ into fabric-specific properties, then using
   reflection to instantiate them up.
 * The smart tricks needed to make JobTest work and mock out taps which
   can be LocalTap or HadoopTaps pretty much at will
 * I couldn't really wrap my head around enough of this without
   actually digging in, rather than planning/designing first. Some
   documentation and possibly a restart from scratch might be needed
   after all.

Things I'm inclined to kick to "later": can we also abstract out storage from "necessarily HDFS"? Is that something useful?

On the other hand, as the (Storage Mode) x (Execution Mode) x (data Scheme) support matrix can be daunting, it can be useful to still make the assumption that everything is HDFS unless it's on LocalTaps which sometimes can be HadoopTap-and-a-wrapper or the other way around.

Next steps: incorporate feedback, clean up, fix outstanding issues in scalding-fabric-tez, (fix in flink in due time), keep current with the develop/cascading3 branches, then figure out how to mainstream that (probably, indeed, breaking up what can be broken up into individual PRs, but I'm afraid there will still be a big atomic change of something at one point).

For now that's just a branch, would it make sense to open an "RFC only" PR to enable the review tools?

    -- Cyrille


Le 14/10/2016 à 22:47, 'Alex Levenson' via Scalding Development a écrit :
This is a large enough change, that probably won't fit into a single PR, that it might merit some sort of design doc / written plan. That way we can come up with a plan and then start implementing it piece by piece across a few PRs.

On Wed, Oct 12, 2016 at 2:11 PM, 'Oscar Boykin' via Scalding Development <[email protected] <mailto:[email protected]>> wrote:

    sounds great!

    On Tue, Oct 11, 2016 at 11:39 PM Cyrille Chépélov
    <[email protected] <mailto:[email protected]>>
    wrote:

        Oscar, Piyush,

        thanks for the feedback!

        At the moment, I'm not sure it's realistic to fully break the
        dependency to "hadoop" completely out of scalding-core. As an
        intermediate goal, I'd shoot for at least soft-removing the
        assumption that the /processing/ is made on Hadoop, but the
        storage interface will pretty much remain HDFS for the time
        being (IOW, I'll leave Source essentially unchanged in
        scalding-core).

        Meanwhile, I'm taking the messages here and on the gitter
        channel as positive towards the principle of scalding-$FABRIC
        sub-modules, and will start working on that in the background.


            -- Cyrille


        Le 12/10/2016 à 03:29, 'Oscar Boykin' via Scalding Development
        a écrit :
        Generally, I think this is a good idea also (separate modules
        for fabrics).

        I agree that Mode and Job are a bit hairy in spots. I think
        we can remove some deprecated code if it makes life
        significantly easier, but source and binary compatibility
        should be kept as much as we can reasonably manage.

        I would actually really rather `buildFlow` be
        private[scalding] but maybe that is too much. Making it
        return a subclass of Flow seems like a fine idea to me at the
        moment.

        Breaking hadoop out of scalding-core seems pretty hard since
        `Source` has it baked in at a few spots. That said, the
        Source abstractions in scalding are not very great. If we
        could improve that (without removing support for the old
        stuff) it might be worth it. Many have complained about
        Source's design over the years, but we have not really had a
        full proposal that seems to address all the concerns.

        The desire for jobs to all look the same across all fabrics
        make modularization a bit ugly.

        On Tue, Oct 11, 2016 at 2:23 PM 'Piyush Narang' via Scalding
        Development <[email protected]
        <mailto:[email protected]>> wrote:

            We ran into similar problems while trying to set the
            number of reducers while testing out Cascading3 on Tez.
            We hacked around it temporarily
            
<https://github.com/twitter/scalding/commit/57983601c7db4ef1e0df3350140d473f371e6bb3>
 but
            haven't yet cleaned up that code and put it out for
            review (we'll need to fork MR / Tez there as
            nodeConfigDef works for Tez but not Hadoop). Based on my
            understanding, so far we've tried to delegate as much of
            this to Cascading as we can but there seem to be a few
            places where we're doing some platform specific stuff in
            Scalding. Breaking up to create fabric-specific
            sub-modules seems like a nice idea to me. We might need
            to think through the right way to do this to ensure we
            don't break stuff. Would it make sense to spin up an
            issue and we can discuss on it?

            On Tue, Oct 11, 2016 at 10:42 AM, Cyrille Chépélov
            <[email protected]
            <mailto:[email protected]>> wrote:

                Hi,

                I'm trying to tie a few loose ends in the way step
                descriptions (text typically passed via
                *.withDescriptions(...)*) and the desired level of
                parallelism (typically passed via *.withReducers(N)*)
                is pushed on the various fabrics.

                Right now:

                  * Most of the scalding code base either ignores the
                    back-end (good) or assumes
                    
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala#L81>
                    the world is either Local or HadoopFlow (which
                    covers Hadoop 1.x and MR1). As a consequence, a
                    couple things don't yet work smoothly on Tez and
                    I assume on Flink.
                  * the descriptions are entirely dropped if not
                    running on Hadoop1 or MR1
                  * .withReducers sets a hadoop-specific property
                    (/mapred/./reduce/./tasks/) at RichPipe#L41
                    
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala#L41>
                  * the Tez fabric ignores .withReducers; and there
                    is no other conduit (for now) to set the number
                    of desired parts on the sinks. As a consequence,
                    you can't run a tez DAG with a large level of
                    parallelism and a small (single) number of output
                    files (e.g. stats leading to a result file of a
                    couple dozen lines); you must pick one and set
                    *cascading.flow.runtime.gather.partitions.num*.
                    There are workarounds, but they're quite ugly.
                  * there are a few instance of "flow match { case
                    HadoopFlow => doSomething ; case _ => () }"
                    scattered around the code
                  * there's some heavily reflection-based code in
                    Mode.scala
                    
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala#L75>
                    which depends on jars not part of the scalding
                    build process (and it's good that these jars stay
                    out of the scalding-core build, e.g. Tez client
                    libraries)
                  * While it may be desirable to experiment with
                    scalding-specific transform registries for
                    cascading (e.g. to deal with the Merge-GroupBy
                    structure, or to perform tests/assertions on the
                    resulting flow graph), it would be impractical to
                    perform the necessary fabric-specific adjustments
                    in Mode.scala as it is.

                I'm trying to find a way to extract away the MR-isms,
                and push it into fabric-specific code which can be
                called when appropriate.

                Questions:

                 1. Would it be appropriate to start having
                    fabric-specific jars (scalding-fabric-hadoop,
                    scalding-fabric-hadoop2-mr1, scalding-fabric-tez
                    etc.), push the fabric-specific code from
                    Mode.scala there ?

                    (we'd keep only the single scalding
                    fabric-related factory using reflection, with
                    appropriate interfaces defined in scalding-core)

                 2. Pushing the fabric-specific code into dedicated
                    jars would probably have user-visible
                    consequences, as we can't make scalding-core
                    depend on scalding-fabric-hadoop (for
                    back-compatibility) unless the fabric-factory
                    interface go into another jar.

                    From my point of view, I would find that
                    intentionally slightly breaking the build once
                    upon upgrade for the purpose of letting the world
                    know that there are other fabrics than MR1 might
                    be acceptable, and on the other hand I haven't
                    used MR1 for over a year.

                    Is this "slight" dependency breakage acceptable,
                    or is it better to have scalding-core still imply
                    the hadoop fabrics?

                 3. Right now, scalding's internals sometimes use
                    Hadoop (MR) specifics to carry various
                    configuration values. Is it acceptable to (at
                    least in the beginning) continue doing so,
                    kindling asking the respective non-hadoop fabrics
                    to pick these values up and convert to the
                    relevant APIs?

                 4. Is it okay to drop the @deprecated(..., "0.12.0")
                    functions from Mode.scala if they are
                    inconvenient to carry over in the process?

                 5. Currently, Job.buildFlow
                    
<https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/Job.scala#L223>
                    returns Flow[_]. Is it okay to have it return
                    Flow[_] with ScaldingFlowGoodies instead,
                    ScaldingFlowGoodies being the provisional
                    interface name where to move the old "flow match
                    { case HadoopFlow => ... }" code?

                Thanks in advance

                    -- Cyrille

-- You received this message because you are subscribed
                to the Google Groups "Scalding Development" group.
                To unsubscribe from this group and stop receiving
                emails from it, send an email to
                [email protected]
                <mailto:[email protected]>.
                For more options, visit
                https://groups.google.com/d/optout
                <https://groups.google.com/d/optout>.




-- - Piyush -- You received this message because you are subscribed to
            the Google Groups "Scalding Development" group.
            To unsubscribe from this group and stop receiving emails
            from it, send an email to
            [email protected]
            <mailto:[email protected]>.
            For more options, visit
            https://groups.google.com/d/optout
            <https://groups.google.com/d/optout>.

-- You received this message because you are subscribed to the
        Google Groups "Scalding Development" group.
        To unsubscribe from this group and stop receiving emails from
        it, send an email to
        [email protected]
        <mailto:[email protected]>.
        For more options, visit https://groups.google.com/d/optout
        <https://groups.google.com/d/optout>.


-- You received this message because you are subscribed to the
        Google Groups "Scalding Development" group.
        To unsubscribe from this group and stop receiving emails from
        it, send an email to [email protected]
        <mailto:[email protected]>.
        For more options, visit https://groups.google.com/d/optout
        <https://groups.google.com/d/optout>.

-- You received this message because you are subscribed to the Google
    Groups "Scalding Development" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to [email protected]
    <mailto:[email protected]>.
    For more options, visit https://groups.google.com/d/optout
    <https://groups.google.com/d/optout>.




--
Alex Levenson
@THISWILLWORK
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected] <mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout.


--
You received this message because you are subscribed to the Google Groups "Scalding 
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to