We ran into similar problems while trying to set the number of reducers
while testing out Cascading3 on Tez. We hacked around it temporarily
<https://github.com/twitter/scalding/commit/57983601c7db4ef1e0df3350140d473f371e6bb3>
but
haven't yet cleaned up that code and put it out for review (we'll need to
fork MR / Tez there as nodeConfigDef works for Tez but not Hadoop). Based
on my understanding, so far we've tried to delegate as much of this to
Cascading as we can but there seem to be a few places where we're doing
some platform specific stuff in Scalding. Breaking up to create
fabric-specific sub-modules seems like a nice idea to me. We might need to
think through the right way to do this to ensure we don't break stuff.
Would it make sense to spin up an issue and we can discuss on it?

On Tue, Oct 11, 2016 at 10:42 AM, Cyrille Chépélov <
[email protected]> wrote:

> Hi,
>
> I'm trying to tie a few loose ends in the way step descriptions (text
> typically passed via *.withDescriptions(...)*) and the desired level of
> parallelism (typically passed via *.withReducers(N)*) is pushed on the
> various fabrics.
>
> Right now:
>
>    - Most of the scalding code base either ignores the back-end (good) or
>    assumes
>    
> <https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala#L81>
>    the world is either Local or HadoopFlow (which covers Hadoop 1.x and MR1).
>    As a consequence, a couple things don't yet work smoothly on Tez and I
>    assume on Flink.
>    - the descriptions are entirely dropped if not running on Hadoop1 or
>    MR1
>    - .withReducers sets a hadoop-specific property (*mapred*.*reduce*.
>    *tasks*) at RichPipe#L41
>    
> <https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala#L41>
>    - the Tez fabric ignores .withReducers; and there is no other conduit
>    (for now) to set the number of desired parts on the sinks. As a
>    consequence, you can't run a tez DAG with a large level of parallelism and
>    a small (single) number of output files (e.g. stats leading to a result
>    file of a couple dozen lines); you must pick one and set
>    *cascading.flow.runtime.gather.partitions.num*. There are workarounds,
>    but they're quite ugly.
>    - there are a few instance of "flow match { case HadoopFlow =>
>    doSomething ; case _ => () }" scattered around the code
>    - there's some heavily reflection-based code in Mode.scala
>    
> <https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala#L75>
>    which depends on jars not part of the scalding build process (and it's good
>    that these jars stay out of the scalding-core build, e.g. Tez client
>    libraries)
>    - While it may be desirable to experiment with scalding-specific
>    transform registries for cascading (e.g. to deal with the Merge-GroupBy
>    structure, or to perform tests/assertions on the resulting flow graph), it
>    would be impractical to perform the necessary fabric-specific adjustments
>    in Mode.scala as it is.
>
> I'm trying to find a way to extract away the MR-isms, and push it into
> fabric-specific code which can be called when appropriate.
>
> Questions:
>
>    1. Would it be appropriate to start having fabric-specific jars
>    (scalding-fabric-hadoop, scalding-fabric-hadoop2-mr1, scalding-fabric-tez
>    etc.), push the fabric-specific code from Mode.scala there ?
>
>    (we'd keep only the single scalding fabric-related factory using
>    reflection, with appropriate interfaces defined in scalding-core)
>
>    2. Pushing the fabric-specific code into dedicated jars would probably
>    have user-visible consequences, as we can't make scalding-core depend on
>    scalding-fabric-hadoop (for back-compatibility) unless the fabric-factory
>    interface go into another jar.
>
>    From my point of view, I would find that intentionally slightly
>    breaking the build once upon upgrade for the purpose of letting the world
>    know that there are other fabrics than MR1 might be acceptable, and on the
>    other hand I haven't used MR1 for over a year.
>
>    Is this "slight" dependency breakage acceptable, or is it better to
>    have scalding-core still imply the hadoop fabrics?
>
>    3. Right now, scalding's internals sometimes use Hadoop (MR) specifics
>    to carry various configuration values. Is it acceptable to (at least in the
>    beginning) continue doing so, kindling asking the respective non-hadoop
>    fabrics to pick these values up and convert to the relevant APIs?
>
>    4. Is it okay to drop the @deprecated(..., "0.12.0") functions from
>    Mode.scala if they are inconvenient to carry over in the process?
>
>    5. Currently, Job.buildFlow
>    
> <https://github.com/twitter/scalding/blob/7ed0f92a946ad8407645695d3def62324f78ac41/scalding-core/src/main/scala/com/twitter/scalding/Job.scala#L223>
>    returns Flow[_]. Is it okay to have it return Flow[_] with
>    ScaldingFlowGoodies instead, ScaldingFlowGoodies being the provisional
>    interface name where to move the old "flow match { case HadoopFlow =>
>    ... }" code?
>
> Thanks in advance
>
>     -- Cyrille
>
> --
> You received this message because you are subscribed to the Google Groups
> "Scalding Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> For more options, visit https://groups.google.com/d/optout.
>



-- 
- Piyush

-- 
You received this message because you are subscribed to the Google Groups 
"Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to