+1. Please keep us updated!

Bikas

From: Chris K Wensel [mailto:[email protected]]
Sent: Thursday, April 09, 2015 10:23 AM
To: [email protected]
Cc: [email protected]; [email protected]
Subject: Re: Report: Scalding+Cascading+TEZ = ♥

This is great news!!

On Apr 9, 2015, at 8:29 AM, Cyrille Chépélov 
<[email protected]<mailto:[email protected]>> wrote:

(cross-posted on scalding-dev@, cascading-user@, and user@tez)

Hi,

Chris K Wensel wrote not so long ago
will also add that one user is having some success with Scalding on Cascading 
3.0 and Tez
I'm that guy. It's been a fun ride, and the news is that there are results. 
While the unboxing experience isn't yet totally pleasant, these results are now 
very promising.

The really good part is that apart from build.sbt, we needed no changes to 
application code to run with the local, hadoop (1.x API on a 2.6.0 cluster), 
hadoop2-mr1, and hadoop2-tez back-ends.

Numbers:

  *   Full dataset: about 116M lines in 6 distinct CSV inputs

     *   hadoop: about 18 hours (pretty much busy all the time, maxing out 
either the LAN, disk bandwith OR CPU depending on phases)
     *   tez: about 8 hours (with no LAN and few disk saturation periods, and 
apparent room for improvement in CPU/task allocation — confident a couple hours 
could be shaved).

  *   Reduced dataset (integration testing dataset): about 2.3M lines in 6 
distinct inputs

     *   hadoop: 112 minutes
     *   tez: 6.25 minutes

  *   In common:

     *   the job is a cascade made of 20 Flows, which compile into about 420 
Cascading steps (Hadoop) or 20 DAG (TEZ)
     *   about 10K lines of Scala code
In the small-dataset experiment, hadoop suffers a lot from the zillions of step 
setup ceremonials it has to perform with YARN, whereas TEZ apps are 
higher-level and tend to stay much longer from the ResourceManager's point of 
view.

Results appear identical so far (still busy comparing and ensuring we've 
covered all code paths, which we haven't yet, but this looks really good).
I am grateful for everyone who had the patience to sift through the huge 
haystacks of logs and graphs I sent, and for the time spent writing patches in 
the dark for me to test.
Chris, I have no idea how much time I tied you up on this, but wow, thanks!

    -- Cyrille

________________________________

How to replicate this:

  *   We are using a cluster of 7 
i7-3770K<http://ark.intel.com/products/65523/Intel-Core-i7-3770K-Processor-8M-Cache-up-to-3_90-GHz>
 ex-desktop machines, running Debian Jessie + Apache Hadoop 2.6.0 under 
Ansible. Each machine has 1 system disk, and 4*2TiB of HDFS spindles, + 120GiB 
of SSD for flush-happy components (Zookeeper, NN, RM etc.). Except for the ATS, 
all components are in HA mode under Zookeeper, Keepalived and/or HAProxy as 
appropriate.

     *   Up to 80% of this cluster is dedicated to the "test" work queue.
     *   16 GiB RAM per node (except one which is at 24GiB), we could now 
justify going to the max at 32GiB.
     *   We noticed in case there are multiple apps running, that a single TEZ 
app will tend to "gobble up" all available slots even if multiple RUNNING MR 
tasks are cohabiting. Not a big deal, and fairly obvious given that TezChildren 
stay up until preempted out or jobless.

  *   Scalding 0.13.1 patched with https://github.com/twitter/scalding/pull/1220

     *   This replaces --hdfs with --hadoop, --hadoop2-mr1 and --hadoop2-tez 
which delegate to the appropriate Cascading back-end (--hdfs becomes an alias 
for --hadoop)

  *   Cascading 3.0.0-wip-97

  *   TEZ rebuilt from the branch-0.6 branch (post-0.6.0) as of commit 
282e63af187f59700191579d2328cae3b8d2fa9c + a few other patches (attached here)

     *   had to patch guava up to version 16.0.1 -- TEZ-2164 would help a lot
     *   TEZ-2256: reduced logging noise by using a boolean rather than an 
exception to signal end of buffer in UnorderedPartitonedKVWriter
     *   TEZ-2237: two patches from Siddarth Seth that helped unstick some of 
the more complex DAGs

  *   a few extra settings in the job config:

·         "tez.task.resource.memory.mb" -> (1024+512).toString, // default 1024

·         "tez.container.max.java.heap.fraction" -> "0.7", // default 0.8

·         "tez.queue.name" -> params.jobArgs.getOrElse("queue", "default"),

·         "cascading.flow.runtime.gather.partitions.num" -> 
params.jobArgs.getOrElse("tez-partitions","4"),

·         "tez.lib.uris" -> params.jobArgs.getOrElse("tez.lib.uris", 
"hdfs://cluster/apps/tez-0.6.0/tez-0.6.0.tar.gz"),

·         "tez.history.logging.service.class" -> 
"org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService",

"tez.allow.disabled.timeline-domains" -> "true"

  *   A couple things in the app's build.sbt (sorry for my clumsy use of SBT):

·         scalaVersion := "2.11.6"

·

·         val hadoopVersion = "2.6.0"

·

·         val cascadingFabric = sys.props.getOrElse("CASCADING_FABRIC", 
"hadoop2-tez") // can be "hadoop", "hadoop2-mr1" or "hadoop2-tez"

·

val cascadingVersion = "3.0.0-wip-97"

if (cascadingVersion.endsWith("-dev")) {

  libraryDependencies ++= Seq(

    "org.jgrapht" % "jgrapht-core" % "0.9.1",

    "org.jgrapht" % "jgrapht-ext" % "0.9.1",

    "riffle" % "riffle" % "1.0.0-wip-7",

    "org.codehaus.janino" % "janino" % "2.7.6"

  )

} else {

  libraryDependencies ++= Seq()

}



val scaldingVersion = {

  if (cascadingFabric == "hadoop") {

    "0.13.1"

  } else {

    "0.13.1-cch-ffc2"  // this one is the version as patched with PR1220

  }

}



if (cascadingFabric == "hadoop2-tez") {

  libraryDependencies ++= Seq("javax.xml.bind" % "jaxb-api" % "2.2.2" 
exclude("javax.xml.stream", "stax-api"),

    "com.sun.jersey" % "jersey-server" % "1.9" exclude("asm","asm"),

    "org.sonatype.sisu.inject" % "cglib" % "2.2.1-v20090111" 
exclude("asm","asm")

  )

}

else {

  libraryDependencies ++= Seq()

}





if (cascadingFabric == "hadoop2-tez") {

  dependencyOverrides +=  "com.google.guava" % "guava" % "16.0.1"



  /* as of 0.6.0, Tez depends on a very old version (11.0.2) of guava,

     which has an incompatibly different API to Stopwatch() than more

     recent guavas.



     Version 14.0 of guava introduced the breaking change. */



  libraryDependencies ++= Seq(

    "org.apache.tez" % "tez-api" % "0.6.0-SNAPSHOT",

    "org.apache.tez" % "tez-mapreduce" % "0.6.0-SNAPSHOT",  // FIXME: not sure 
this is needed

    "com.google.guava" % "guava" % "16.0.1"

  ) } else {

  libraryDependencies ++= Seq()

}



libraryDependencies ++= Seq(

 "com.twitter" %% "scalding-core" % scaldingVersion

      exclude("cascading", "cascading-core")

      exclude("cascading", "cascading-hadoop")

      exclude("cascading", "cascading-local"),

 "com.twitter" %% "scalding-args" % scaldingVersion

      exclude("cascading", "cascading-core")

      exclude("cascading", "cascading-hadoop")

      exclude("cascading", "cascading-local"),

 "com.twitter" %% "scalding-date" % scaldingVersion

      exclude("cascading", "cascading-core")

      exclude("cascading", "cascading-hadoop")

      exclude("cascading", "cascading-local"),

 "com.twitter" %% "scalding-commons" % scaldingVersion

      exclude("cascading", "cascading-core")

      exclude("cascading", "cascading-hadoop")

      exclude("cascading", "cascading-local")

      exclude("com.hadoop.gplcompression", "hadoop-lzo")

            // hadoop-lzo also pulled in by elephantbird

 )



libraryDependencies ++= Seq(

  "org.apache.thrift" % "libthrift" % "0.9.1",



  "cascading" % "cascading-core" % cascadingVersion,

  "cascading" % ("cascading-" + cascadingFabric) % cascadingVersion,

    "cascading" % "cascading-local" % cascadingVersion



  )
The tez.container.max.java.heap.fraction override was important, to ensure the 
Heap+Native memory of tez children didn't exceed the allocated container size. 
As the Application/Scalding/Scala/Cascading/Tez/JVM stack probably takes a 
little more native memory than a more basic app/Tez/JVM stack, the defaults 
were too small and YARN kept yanking Tez children at times. As has been pointed 
out, running the cluster with yarn.nodemanager.pmem-check-enabled=false removes 
the need for that.

An item in one DAG was creating Out-Of-Memory trouble in Tez' DefaultSorter 
when running with the default task memory at 1024MiB (716MiB for the heap), 
increasing it slightly was a coward but effective workaround.

--
You received this message because you are subscribed to the Google Groups 
"Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to 
[email protected]<mailto:[email protected]>.
For more options, visit https://groups.google.com/d/optout.
<0001-Switch-to-guava-s-newer-API-for-Stopwatch-14.0.patch><0002-DO-NOT-COMMIT-bump-guava-to-0.16.patch><0003-WIP-TEZ-2256-a-first-attempt.patch><0004-TEZ-2237-hack.branch6.txt-WIP-trying-out-patch-from-.patch><0005-TEZ-2237-WIP-experimenal-patch-2-TEZ-2237.test.2_bra.patch>

—
Chris K Wensel
[email protected]<mailto:[email protected]>



Reply via email to